enhance: add streaming coord and node grpc service register (#34655)

issue: #33285

- register streaming coord service into datacoord.
- add new streaming node role.
- add global static switch to enable streaming service or not.

Signed-off-by: chyezh <chyezh@outlook.com>
pull/34969/head
chyezh 2024-07-25 12:19:44 +08:00 committed by GitHub
parent 4f6cbfd520
commit bef06e5acf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1017 additions and 35 deletions

View File

@ -0,0 +1,44 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package components
import (
"context"
"github.com/milvus-io/milvus/internal/distributed/streamingnode"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type StreamingNode struct {
*streamingnode.Server
}
// NewStreamingNode creates a new StreamingNode
func NewStreamingNode(_ context.Context, _ dependency.Factory) (*StreamingNode, error) {
svr, err := streamingnode.NewServer()
if err != nil {
return nil, err
}
return &StreamingNode{
Server: svr,
}, nil
}
func (q *StreamingNode) GetName() string {
return typeutil.StreamingNodeRole
}

View File

@ -21,6 +21,7 @@ import (
"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -123,7 +124,7 @@ func removePidFile(lock *flock.Flock) {
func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
alias, enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord, enableQueryNode,
enableDataNode, enableIndexNode, enableProxy := formatFlags(args, flags)
enableDataNode, enableIndexNode, enableProxy, enableStreamingNode := formatFlags(args, flags)
serverType := args[2]
role := roles.NewMilvusRoles()
@ -147,6 +148,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StreamingNodeRole:
streamingutil.MustEnableStreamingService()
role.EnableStreamingNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
@ -156,6 +160,9 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
if streamingutil.IsStreamingServiceEnabled() {
role.EnableStreamingNode = true
}
role.Local = true
role.Embedded = serverType == typeutil.EmbeddedRole
case typeutil.MixtureRole:
@ -167,6 +174,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
role.EnableDataNode = enableDataNode
role.EnableIndexNode = enableIndexNode
role.EnableProxy = enableProxy
role.EnableStreamingNode = enableStreamingNode
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
os.Exit(-1)
@ -177,6 +185,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCoord, enableQueryCoord,
enableIndexCoord, enableDataCoord, enableQueryNode, enableDataNode, enableIndexNode, enableProxy bool,
enableStreamingNode bool,
) {
flags.StringVar(&alias, "alias", "", "set alias")
@ -189,6 +198,11 @@ func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCo
flags.BoolVar(&enableDataNode, typeutil.DataNodeRole, false, "enable data node")
flags.BoolVar(&enableIndexNode, typeutil.IndexNodeRole, false, "enable index node")
flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node")
flags.BoolVar(&enableStreamingNode, typeutil.StreamingNodeRole, false, "enable streaming node")
if enableStreamingNode {
streamingutil.MustEnableStreamingService()
}
serverType := args[2]
if serverType == typeutil.EmbeddedRole {

View File

@ -138,6 +138,7 @@ type MilvusRoles struct {
EnableDataNode bool `env:"ENABLE_DATA_NODE"`
EnableIndexCoord bool `env:"ENABLE_INDEX_COORD"`
EnableIndexNode bool `env:"ENABLE_INDEX_NODE"`
EnableStreamingNode bool `env:"ENABLE_STREAMING_NODE"`
Local bool
Alias string
@ -207,6 +208,11 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync
return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord)
}
func (mr *MilvusRoles) runStreamingNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewStreamingNode, metrics.RegisterStreamingNode)
}
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
wg.Add(1)
return runComponent(ctx, localMsg, wg, components.NewDataNode, metrics.RegisterDataNode)
@ -374,7 +380,7 @@ func (mr *MilvusRoles) Run() {
componentMap := make(map[string]component)
var rootCoord, queryCoord, indexCoord, dataCoord component
var proxy, dataNode, indexNode, queryNode component
var proxy, dataNode, indexNode, queryNode, streamingNode component
if mr.EnableRootCoord {
rootCoord = mr.runRootCoord(ctx, local, &wg)
componentMap[typeutil.RootCoordRole] = rootCoord
@ -414,6 +420,11 @@ func (mr *MilvusRoles) Run() {
componentMap[typeutil.ProxyRole] = proxy
}
if mr.EnableStreamingNode {
streamingNode = mr.runStreamingNode(ctx, local, &wg)
componentMap[typeutil.StreamingNodeRole] = streamingNode
}
wg.Wait()
http.RegisterStopComponent(func(role string) error {
@ -480,7 +491,7 @@ func (mr *MilvusRoles) Run() {
log.Info("All coordinators have stopped")
// stop nodes
nodes := []component{queryNode, indexNode, dataNode}
nodes := []component{queryNode, indexNode, dataNode, streamingNode}
for idx, node := range nodes {
if node != nil {
log.Info("stop node", zap.Int("idx", idx), zap.Any("node", node))

View File

@ -32,6 +32,7 @@ import (
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/broker"
@ -43,9 +44,11 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -155,6 +158,9 @@ type Server struct {
// manage ways that data coord access other coord
broker broker.Broker
// streamingcoord server is embedding in datacoord now.
streamingCoord *streamingcoord.Server
}
type CollectionNameInfo struct {
@ -303,6 +309,11 @@ func (s *Server) Init() error {
}
s.startDataCoord()
log.Info("DataCoord startup success")
if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully at standby mode")
}
return nil
}
s.stateCode.Store(commonpb.StateCode_StandBy)
@ -313,6 +324,10 @@ func (s *Server) Init() error {
return s.initDataCoord()
}
func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)
}
func (s *Server) initDataCoord() error {
s.stateCode.Store(commonpb.StateCode_Initializing)
var err error
@ -334,6 +349,18 @@ func (s *Server) initDataCoord() error {
return err
}
// Initialize streaming coordinator.
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()
if err = s.streamingCoord.Init(context.TODO()); err != nil {
return err
}
log.Info("init streaming coordinator done")
}
s.handler = newServerHandler(s)
// check whether old node exist, if yes suspend auto balance until all old nodes down
@ -390,6 +417,10 @@ func (s *Server) Start() error {
if !s.enableActiveStandBy {
s.startDataCoord()
log.Info("DataCoord startup successfully")
if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully")
}
}
return nil
@ -1009,6 +1040,12 @@ func (s *Server) Stop() error {
s.garbageCollector.close()
logutil.Logger(s.ctx).Info("datacoord garbage collector stopped")
if s.streamingCoord != nil {
log.Info("StreamingCoord stoping...")
s.streamingCoord.Stop()
log.Info("StreamingCoord stopped")
}
s.stopServerLoop()
s.importScheduler.Close()
@ -1035,7 +1072,6 @@ func (s *Server) Stop() error {
s.stopServerLoop()
logutil.Logger(s.ctx).Info("datacoord serverloop stopped")
logutil.Logger(s.ctx).Warn("datacoord stop successful")
return nil
}

View File

@ -43,6 +43,8 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/streamingutil"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
@ -120,14 +122,14 @@ func (s *Server) init() error {
log.Info("Connected to tikv. Using tikv as metadata storage.")
}
err = s.startGrpc()
if err != nil {
log.Debug("DataCoord startGrpc failed", zap.Error(err))
if err := s.dataCoord.Init(); err != nil {
log.Error("dataCoord init error", zap.Error(err))
return err
}
if err := s.dataCoord.Init(); err != nil {
log.Error("dataCoord init error", zap.Error(err))
err = s.startGrpc()
if err != nil {
log.Debug("DataCoord startGrpc failed", zap.Error(err))
return err
}
return nil
@ -184,6 +186,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
@ -195,9 +198,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)))
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.
if streamingutil.IsStreamingServiceEnabled() {
s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
}
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err

View File

@ -0,0 +1,345 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package streamingnode
import (
"context"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
"github.com/cockroachdb/errors"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
streamingnodeserver "github.com/milvus-io/milvus/internal/streamingnode/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/sessionutil"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/interceptor"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// Server is the grpc server of streamingnode.
type Server struct {
stopOnce sync.Once
grpcServerChan chan struct{}
// session of current server.
session *sessionutil.Session
// server
streamingnode *streamingnodeserver.Server
// rpc
grpcServer *grpc.Server
lis net.Listener
// component client
etcdCli *clientv3.Client
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
}
// NewServer create a new StreamingNode server.
func NewServer() (*Server, error) {
return &Server{
stopOnce: sync.Once{},
grpcServerChan: make(chan struct{}),
}, nil
}
// Run runs the server.
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Info("streamingnode init done ...")
if err := s.start(); err != nil {
return err
}
log.Info("streamingnode start done ...")
return nil
}
// Stop stops the server, should be call after Run returned.
func (s *Server) Stop() (err error) {
s.stopOnce.Do(s.stop)
return nil
}
// stop stops the server.
func (s *Server) stop() {
addr, _ := s.getAddress()
log.Info("streamingnode stop", zap.String("Address", addr))
// Unregister current server from etcd.
log.Info("streamingnode unregister session from etcd...")
if err := s.session.GoingStop(); err != nil {
log.Warn("streamingnode unregister session failed", zap.Error(err))
}
// Stop grpc server.
log.Info("streamingnode stop grpc server...")
s.grpcServer.GracefulStop()
// Stop StreamingNode service.
log.Info("streamingnode stop service...")
s.streamingnode.Stop()
// Stop all session
log.Info("streamingnode stop session...")
s.session.Stop()
// Stop rootCoord client.
log.Info("streamingnode stop rootCoord client...")
if err := s.rootCoord.Close(); err != nil {
log.Warn("streamingnode stop rootCoord client failed", zap.Error(err))
}
// Wait for grpc server to stop.
log.Info("wait for grpc server stop...")
<-s.grpcServerChan
log.Info("streamingnode stop done")
}
// Health check the health status of streamingnode.
func (s *Server) Health(ctx context.Context) commonpb.StateCode {
return s.streamingnode.Health(ctx)
}
func (s *Server) init() (err error) {
defer func() {
if err != nil {
log.Error("StreamingNode init failed", zap.Error(err))
return
}
log.Info("init StreamingNode server finished")
}()
// Create etcd client.
s.etcdCli, _ = kvfactory.GetEtcdAndPath()
if err := s.allocateAddress(); err != nil {
return err
}
if err := s.initSession(); err != nil {
return err
}
if err := s.initRootCoord(); err != nil {
return err
}
if err := s.initDataCoord(); err != nil {
return err
}
s.initGRPCServer()
// Create StreamingNode service.
s.streamingnode = streamingnodeserver.NewServerBuilder().
WithETCD(s.etcdCli).
WithGRPCServer(s.grpcServer).
WithRootCoordClient(s.rootCoord).
WithDataCoordClient(s.dataCoord).
WithSession(s.session).
Build()
if err := s.streamingnode.Init(context.Background()); err != nil {
return errors.Wrap(err, "StreamingNode service init failed")
}
return nil
}
func (s *Server) start() (err error) {
defer func() {
if err != nil {
log.Error("StreamingNode start failed", zap.Error(err))
return
}
log.Info("start StreamingNode server finished")
}()
// Start StreamingNode service.
s.streamingnode.Start()
// Start grpc server.
if err := s.startGPRCServer(); err != nil {
return errors.Wrap(err, "StreamingNode start gRPC server fail")
}
// Register current server to etcd.
s.registerSessionToETCD()
return nil
}
func (s *Server) initSession() error {
s.session = sessionutil.NewSession(context.Background())
if s.session == nil {
return errors.New("session is nil, the etcd client connection may have failed")
}
addr, err := s.getAddress()
if err != nil {
return err
}
s.session.Init(typeutil.StreamingNodeRole, addr, false, true)
paramtable.SetNodeID(s.session.ServerID)
log.Info("StreamingNode init session", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("node address", addr))
return nil
}
func (s *Server) initRootCoord() (err error) {
log.Info("StreamingNode connect to rootCoord...")
s.rootCoord, err = rcc.NewClient(context.Background())
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}
log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(context.Background(), s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}
return nil
}
func (s *Server) initDataCoord() (err error) {
log.Info("StreamingNode connect to dataCoord...")
s.dataCoord, err = dcc.NewClient(context.Background())
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}
log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(context.Background(), s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}
return nil
}
func (s *Server) initGRPCServer() {
log.Info("create StreamingNode server...")
cfg := &paramtable.Get().StreamingNodeGrpcServerCfg
kaep := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection
PermitWithoutStream: true, // Allow pings even when there are no active streams
}
kasp := keepalive.ServerParameters{
Time: 60 * time.Second, // Ping the client if it is idle for 60 seconds to ensure the connection is still active
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}
serverIDGetter := func() int64 {
return s.session.ServerID
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(cfg.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(cfg.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(serverIDGetter),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)))
}
// allocateAddress allocates a available address for streamingnode grpc server.
func (s *Server) allocateAddress() (err error) {
port := paramtable.Get().StreamingNodeGrpcServerCfg.Port.GetAsInt()
retry.Do(context.Background(), func() error {
addr := ":" + strconv.Itoa(port)
s.lis, err = net.Listen("tcp", addr)
if err != nil {
if port != 0 {
// set port=0 to get next available port by os
log.Warn("StreamingNode suggested port is in used, try to get by os", zap.Error(err))
port = 0
}
}
return err
}, retry.Attempts(10))
return err
}
// getAddress returns the address of streamingnode grpc server.
// must be called after allocateAddress.
func (s *Server) getAddress() (string, error) {
if s.lis == nil {
return "", errors.New("StreamingNode grpc server is not initialized")
}
ip := paramtable.Get().StreamingNodeGrpcServerCfg.IP
return fmt.Sprintf("%s:%d", ip, s.lis.Addr().(*net.TCPAddr).Port), nil
}
// startGRPCServer starts the grpc server.
func (s *Server) startGPRCServer() error {
errCh := make(chan error, 1)
go func() {
defer close(s.grpcServerChan)
if err := s.grpcServer.Serve(s.lis); err != nil {
select {
case errCh <- err:
// failure at initial startup.
default:
// failure at runtime.
panic(errors.Wrapf(err, "grpc server stop with unexpected error"))
}
}
}()
funcutil.CheckGrpcReady(context.Background(), errCh)
return <-errCh
}
// registerSessionToETCD registers current server to etcd.
func (s *Server) registerSessionToETCD() {
s.session.Register()
// start liveness check
s.session.LivenessCheck(context.Background(), func() {
log.Error("StreamingNode disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID()))
os.Exit(1)
})
}

View File

@ -10,6 +10,8 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
grpc "google.golang.org/grpc"
indexpb "github.com/milvus-io/milvus/internal/proto/indexpb"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
@ -2318,6 +2320,39 @@ func (_c *MockDataCoord_Register_Call) RunAndReturn(run func() error) *MockDataC
return _c
}
// RegisterStreamingCoordGRPCService provides a mock function with given fields: s
func (_m *MockDataCoord) RegisterStreamingCoordGRPCService(s *grpc.Server) {
_m.Called(s)
}
// MockDataCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService'
type MockDataCoord_RegisterStreamingCoordGRPCService_Call struct {
*mock.Call
}
// RegisterStreamingCoordGRPCService is a helper method to define mock.On call
// - s *grpc.Server
func (_e *MockDataCoord_Expecter) RegisterStreamingCoordGRPCService(s interface{}) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
return &MockDataCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", s)}
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(s *grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*grpc.Server))
})
return _c
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Return() *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return()
return _c
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return(run)
return _c
}
// ReportDataNodeTtMsgs provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) ReportDataNodeTtMsgs(_a0 context.Context, _a1 *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -0,0 +1,48 @@
package server
import (
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/metastore/kv/streamingcoord"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ServerBuilder struct {
etcdClient *clientv3.Client
metaKV kv.MetaKv
session sessionutil.SessionInterface
}
func NewServerBuilder() *ServerBuilder {
return &ServerBuilder{}
}
func (b *ServerBuilder) WithETCD(e *clientv3.Client) *ServerBuilder {
b.etcdClient = e
return b
}
func (b *ServerBuilder) WithMetaKV(metaKV kv.MetaKv) *ServerBuilder {
b.metaKV = metaKV
return b
}
func (b *ServerBuilder) WithSession(session sessionutil.SessionInterface) *ServerBuilder {
b.session = session
return b
}
func (s *ServerBuilder) Build() *Server {
resource.Init(
resource.OptETCD(s.etcdClient),
resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)),
)
return &Server{
session: s.session,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole),
}
}

View File

@ -37,8 +37,8 @@ func Init(opts ...optResourceInit) {
}
assertNotNil(newR.ETCD())
assertNotNil(newR.StreamingCatalog())
// TODO: after add streaming node manager client, remove this line.
// assertNotNil(r.StreamingNodeManagerClient())
newR.streamingNodeManagerClient = manager.NewManagerClient(newR.etcdClient)
assertNotNil(newR.StreamingNodeManagerClient())
r = newR
}

View File

@ -21,12 +21,6 @@ func TestInit(t *testing.T) {
mock_metastore.NewMockStreamingCoordCataLog(t),
))
})
Init(OptETCD(&clientv3.Client{}), OptStreamingCatalog(
mock_metastore.NewMockStreamingCoordCataLog(t),
))
assert.NotNil(t, Resource().StreamingCatalog())
assert.NotNil(t, Resource().ETCD())
}
func TestInitForTest(t *testing.T) {

View File

@ -0,0 +1,82 @@
package server
import (
"context"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
)
// Server is the streamingcoord server.
type Server struct {
// session of current server.
session sessionutil.SessionInterface
// service level variables.
assignmentService service.AssignmentService
componentStateService *componentutil.ComponentStateService // state.
// basic component variables can be used at service level.
balancer balancer.Balancer
}
// Init initializes the streamingcoord server.
func (s *Server) Init(ctx context.Context) (err error) {
log.Info("init streamingcoord server...")
s.componentStateService.OnInitializing()
// Init all underlying component of streamingcoord server.
if err := s.initBasicComponent(ctx); err != nil {
log.Error("init basic component of streamingcoord server failed", zap.Error(err))
return err
}
// Init all grpc service of streamingcoord server.
s.initService()
s.componentStateService.OnInitialized(s.session.GetServerID())
log.Info("streamingcoord server initialized")
return nil
}
// initBasicComponent initialize all underlying dependency for streamingcoord.
func (s *Server) initBasicComponent(ctx context.Context) error {
// Init balancer
var err error
// Read new incoming topics from configuration, and register it into balancer.
newIncomingTopics := util.GetAllTopicsFromConfiguration()
s.balancer, err = balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
return err
}
// initService initializes the grpc service.
func (s *Server) initService() {
s.assignmentService = service.NewAssignmentService(s.balancer)
}
// registerGRPCService register all grpc service to grpc server.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
streamingpb.RegisterStreamingCoordStateServiceServer(grpcServer, s.componentStateService)
}
// Start starts the streamingcoord server.
func (s *Server) Start() {
// Just do nothing now.
log.Info("start streamingcoord server")
}
// Stop stops the streamingcoord server.
func (s *Server) Stop() {
s.componentStateService.OnStopping()
log.Info("close balancer...")
s.balancer.Close()
log.Info("streamingcoord server stopped")
}

View File

@ -0,0 +1,42 @@
package server
import (
"context"
"strings"
"testing"
"github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestServer(t *testing.T) {
paramtable.Init()
params := paramtable.Get()
endpoints := params.EtcdCfg.Endpoints.GetValue()
etcdEndpoints := strings.Split(endpoints, ",")
c, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
assert.NoError(t, err)
assert.NotNil(t, c)
b := NewServerBuilder()
metaKV := etcdkv.NewEtcdKV(c, "test")
s := sessionutil.NewMockSession(t)
s.EXPECT().GetServerID().Return(1)
newServer := b.WithETCD(c).
WithMetaKV(metaKV).
WithSession(s).
Build()
ctx := context.Background()
err = newServer.Init(ctx)
assert.NoError(t, err)
newServer.Start()
newServer.Stop()
}

View File

@ -0,0 +1,71 @@
package server
import (
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// ServerBuilder is used to build a server.
// All component should be initialized before server initialization should be added here.
type ServerBuilder struct {
etcdClient *clientv3.Client
grpcServer *grpc.Server
rc types.RootCoordClient
dc types.DataCoordClient
session *sessionutil.Session
}
// NewServerBuilder creates a new server builder.
func NewServerBuilder() *ServerBuilder {
return &ServerBuilder{}
}
// WithETCD sets etcd client to the server builder.
func (b *ServerBuilder) WithETCD(e *clientv3.Client) *ServerBuilder {
b.etcdClient = e
return b
}
// WithGRPCServer sets grpc server to the server builder.
func (b *ServerBuilder) WithGRPCServer(svr *grpc.Server) *ServerBuilder {
b.grpcServer = svr
return b
}
// WithRootCoordClient sets root coord client to the server builder.
func (b *ServerBuilder) WithRootCoordClient(rc types.RootCoordClient) *ServerBuilder {
b.rc = rc
return b
}
// WithDataCoordClient sets data coord client to the server builder.
func (b *ServerBuilder) WithDataCoordClient(dc types.DataCoordClient) *ServerBuilder {
b.dc = dc
return b
}
// WithSession sets session to the server builder.
func (b *ServerBuilder) WithSession(session *sessionutil.Session) *ServerBuilder {
b.session = session
return b
}
// Build builds a streaming node server.
func (s *ServerBuilder) Build() *Server {
resource.Init(
resource.OptETCD(s.etcdClient),
resource.OptRootCoordClient(s.rc),
resource.OptDataCoordClient(s.dc),
)
return &Server{
session: s.session,
grpcServer: s.grpcServer,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingNodeRole),
}
}

View File

@ -48,6 +48,7 @@ func Init(opts ...optResourceInit) {
assertNotNil(r.TSOAllocator())
assertNotNil(r.ETCD())
assertNotNil(r.RootCoordClient())
assertNotNil(r.DataCoordClient())
}
// Resource access the underlying singleton of resources.

View File

@ -19,7 +19,11 @@ func TestInit(t *testing.T) {
assert.Panics(t, func() {
Init(OptRootCoordClient(mocks.NewMockRootCoordClient(t)))
})
Init(OptETCD(&clientv3.Client{}), OptRootCoordClient(mocks.NewMockRootCoordClient(t)))
Init(
OptETCD(&clientv3.Client{}),
OptRootCoordClient(mocks.NewMockRootCoordClient(t)),
OptDataCoordClient(mocks.NewMockDataCoordClient(t)),
)
assert.NotNil(t, Resource().TSOAllocator())
assert.NotNil(t, Resource().ETCD())

View File

@ -0,0 +1,90 @@
package server
import (
"context"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/streamingpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/service"
"github.com/milvus-io/milvus/internal/streamingnode/server/walmanager"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
// Server is the streamingnode server.
type Server struct {
// session of current server.
session *sessionutil.Session
grpcServer *grpc.Server
// service level instances.
handlerService service.HandlerService
managerService service.ManagerService
componentStateService *componentutil.ComponentStateService // state.
// basic component instances.
walManager walmanager.Manager
}
// Init initializes the streamingnode server.
func (s *Server) Init(ctx context.Context) (err error) {
log.Info("init streamingnode server...")
s.componentStateService.OnInitializing()
// init all basic components.
s.initBasicComponent(ctx)
// init all service.
s.initService(ctx)
log.Info("streamingnode server initialized")
s.componentStateService.OnInitialized(s.session.ServerID)
return nil
}
// Start starts the streamingnode server.
func (s *Server) Start() {
// Just do nothing now.
}
// Stop stops the streamingnode server.
func (s *Server) Stop() {
log.Info("stopping streamingnode server...")
s.componentStateService.OnStopping()
log.Info("close wal manager...")
s.walManager.Close()
log.Info("streamingnode server stopped")
}
// Health returns the health status of the streamingnode server.
func (s *Server) Health(ctx context.Context) commonpb.StateCode {
resp, _ := s.componentStateService.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
return resp.State.StateCode
}
// initBasicComponent initialize all underlying dependency for streamingnode.
func (s *Server) initBasicComponent(_ context.Context) {
var err error
s.walManager, err = walmanager.OpenManager()
if err != nil {
panic("open wal manager failed")
}
}
// initService initializes the grpc service.
func (s *Server) initService(_ context.Context) {
s.handlerService = service.NewHandlerService(s.walManager)
s.managerService = service.NewManagerService(s.walManager)
s.registerGRPCService(s.grpcServer)
}
// registerGRPCService register all grpc service to grpc server.
func (s *Server) registerGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingNodeHandlerServiceServer(grpcServer, s.handlerService)
streamingpb.RegisterStreamingNodeManagerServiceServer(grpcServer, s.managerService)
streamingpb.RegisterStreamingNodeStateServiceServer(grpcServer, s.componentStateService)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -118,6 +119,8 @@ type DataCoord interface {
type DataCoordComponent interface {
DataCoord
RegisterStreamingCoordGRPCService(s *grpc.Server)
SetAddress(address string)
// SetEtcdClient set EtcdClient for DataCoord
// `etcdClient` is a client of etcd

View File

@ -0,0 +1,89 @@
package componentutil
import (
"context"
"sync"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// NewComponentStateService create a ComponentStateService
func NewComponentStateService(role string) *ComponentStateService {
return &ComponentStateService{
nodeID: common.NotRegisteredID,
role: role,
stateCode: commonpb.StateCode_StandBy,
}
}
// ComponentStateService is a helper type to implement a GetComponentStates rpc at server side.
// StandBy -> Initializing -> Healthy -> Abnormal -> Healthy
// All can transfer into Stopping
type ComponentStateService struct {
mu sync.Mutex
nodeID int64
role string
stateCode commonpb.StateCode
}
// OnInitializing set the state to initializing
func (s *ComponentStateService) OnInitializing() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stateCode != commonpb.StateCode_StandBy {
panic("standby -> initializing")
}
s.stateCode = commonpb.StateCode_Initializing
}
func (s *ComponentStateService) OnInitialized(nodeID int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.nodeID = nodeID
if s.stateCode != commonpb.StateCode_Initializing {
panic("initializing -> healthy")
}
s.stateCode = commonpb.StateCode_Healthy
}
func (s *ComponentStateService) OnHealthy() {
s.mu.Lock()
if s.stateCode == commonpb.StateCode_Abnormal {
s.stateCode = commonpb.StateCode_Healthy
}
s.mu.Unlock()
}
func (s *ComponentStateService) OnAbnormal() {
s.mu.Lock()
if s.stateCode == commonpb.StateCode_Healthy {
s.stateCode = commonpb.StateCode_Abnormal
}
s.mu.Unlock()
}
func (s *ComponentStateService) OnStopping() {
s.mu.Lock()
s.stateCode = commonpb.StateCode_Stopping
s.mu.Unlock()
}
// GetComponentStates get the component state of a milvus node.
func (s *ComponentStateService) GetComponentStates(ctx context.Context, _ *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
s.mu.Lock()
code := s.stateCode
nodeID := s.nodeID
s.mu.Unlock()
return &milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
NodeID: nodeID,
Role: s.role,
StateCode: code,
},
Status: merr.Status(nil),
}, nil
}

View File

@ -0,0 +1,46 @@
package componentutil
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
)
func TestComponentStateService(t *testing.T) {
ctx := context.Background()
s := NewComponentStateService("role")
resp, err := s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_StandBy, resp.State.StateCode)
s.OnInitializing()
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Initializing, resp.State.StateCode)
s.OnInitialized(1)
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode)
assert.Equal(t, "role", resp.State.Role)
assert.Equal(t, int64(1), resp.State.NodeID)
s.OnAbnormal()
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Abnormal, resp.State.StateCode)
s.OnHealthy()
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode)
s.OnStopping()
resp, err = s.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Stopping, resp.State.StateCode)
}

View File

@ -0,0 +1,16 @@
package streamingutil
import "os"
// IsStreamingServiceEnabled returns whether the streaming service is enabled.
func IsStreamingServiceEnabled() bool {
// TODO: check if the environment variable MILVUS_STREAMING_SERVICE_ENABLED is set
return os.Getenv("MILVUS_STREAMING_SERVICE_ENABLED") == "1"
}
// MustEnableStreamingService panics if the streaming service is not enabled.
func MustEnableStreamingService() {
if !IsStreamingServiceEnabled() {
panic("start a streaming node without enabling streaming service, please set environment variable MILVUS_STREAMING_SERVICE_ENABLED = 1")
}
}

View File

@ -89,6 +89,7 @@ type ComponentParam struct {
DataCoordGrpcServerCfg GrpcServerConfig
DataNodeGrpcServerCfg GrpcServerConfig
IndexNodeGrpcServerCfg GrpcServerConfig
StreamingNodeGrpcServerCfg GrpcServerConfig
RootCoordGrpcClientCfg GrpcClientConfig
ProxyGrpcClientCfg GrpcClientConfig
@ -99,7 +100,6 @@ type ComponentParam struct {
IndexNodeGrpcClientCfg GrpcClientConfig
StreamingCoordGrpcClientCfg GrpcClientConfig
StreamingNodeGrpcClientCfg GrpcClientConfig
IntegrationTestCfg integrationTestConfig
RuntimeConfig runtimeConfig
@ -130,6 +130,8 @@ func (p *ComponentParam) init(bt *BaseTable) {
p.DataCoordCfg.init(bt)
p.DataNodeCfg.init(bt)
p.IndexNodeCfg.init(bt)
p.StreamingCoordCfg.init(bt)
p.StreamingNodeCfg.init(bt)
p.HTTPCfg.init(bt)
p.LogCfg.init(bt)
p.RoleCfg.init(bt)
@ -145,6 +147,7 @@ func (p *ComponentParam) init(bt *BaseTable) {
p.DataCoordGrpcServerCfg.Init("dataCoord", bt)
p.DataNodeGrpcServerCfg.Init("dataNode", bt)
p.IndexNodeGrpcServerCfg.Init("indexNode", bt)
p.StreamingNodeGrpcServerCfg.Init("streamingNode", bt)
p.RootCoordGrpcClientCfg.Init("rootCoord", bt)
p.ProxyGrpcClientCfg.Init("proxy", bt)