enhance: move streaming coord from datacoord to rootcoord (#39007)

issue: #38399

We want to support broadcast operation for both streaming and msgstream.
But msgstream can be only sent message from rootcoord and proxy.
So this pr move the streamingcoord to rootcoord to make easier
implementation.

---------

Signed-off-by: chyezh <chyezh@outlook.com>
pull/39050/head
Zhen Ye 2025-01-07 17:42:57 +08:00 committed by GitHub
parent 3dc95153b7
commit c5a7000a92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 241 additions and 177 deletions

View File

@ -50,9 +50,6 @@ type LocalClientRoleConfig struct {
// EnableLocalClientRole init localable roles
func EnableLocalClientRole(cfg *LocalClientRoleConfig) {
if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
return
}
if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole {
return
}
@ -93,7 +90,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
var client types.QueryCoordClient
var err error
if enableLocal.EnableQueryCoord {
if enableLocal.EnableQueryCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.queryCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
@ -109,7 +106,7 @@ func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
var client types.DataCoordClient
var err error
if enableLocal.EnableDataCoord {
if enableLocal.EnableDataCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.dataCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
@ -125,7 +122,7 @@ func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
var client types.RootCoordClient
var err error
if enableLocal.EnableRootCoord {
if enableLocal.EnableRootCoord && paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
client, err = glocalClient.rootCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
@ -137,6 +134,12 @@ func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
return client
}
// MustGetLocalRootCoordClientFuture return root coord client future,
// panic if root coord client is not enabled
func MustGetLocalRootCoordClientFuture() *syncutil.Future[types.RootCoordClient] {
return glocalClient.rootCoordClient
}
type nopCloseQueryCoordClient struct {
querypb.QueryCoordClient
}

View File

@ -33,7 +33,6 @@ import (
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -49,7 +48,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -163,9 +161,6 @@ type Server struct {
// manage ways that data coord access other coord
broker broker.Broker
// streamingcoord server is embedding in datacoord now.
streamingCoord *streamingcoord.Server
metricsRequest *metricsinfo.MetricsRequest
}
@ -312,12 +307,6 @@ func (s *Server) Init() error {
if err := s.initKV(); err != nil {
return err
}
if streamingutil.IsStreamingServiceEnabled() {
s.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(s.etcdCli).
WithMetaKV(s.kv).
WithSession(s.session).Build()
}
if s.enableActiveStandBy {
s.activateFunc = func() error {
log.Info("DataCoord switch from standby to active, activating")
@ -327,11 +316,6 @@ func (s *Server) Init() error {
}
s.startDataCoord()
log.Info("DataCoord startup success")
if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully at standby mode")
}
return nil
}
s.stateCode.Store(commonpb.StateCode_StandBy)
@ -342,10 +326,6 @@ func (s *Server) Init() error {
return s.initDataCoord()
}
func (s *Server) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)
}
func (s *Server) initDataCoord() error {
log := log.Ctx(s.ctx)
s.stateCode.Store(commonpb.StateCode_Initializing)
@ -376,15 +356,6 @@ func (s *Server) initDataCoord() error {
return err
}
// Initialize streaming coordinator.
if streamingutil.IsStreamingServiceEnabled() {
if err = s.streamingCoord.Init(context.TODO()); err != nil {
return err
}
log.Info("init streaming coordinator done")
}
s.handler = newServerHandler(s)
// check whether old node exist, if yes suspend auto balance until all old nodes down
@ -445,10 +416,6 @@ func (s *Server) Start() error {
if !s.enableActiveStandBy {
s.startDataCoord()
log.Info("DataCoord startup successfully")
if s.streamingCoord != nil {
s.streamingCoord.Start()
log.Info("StreamingCoord stratup successfully")
}
}
return nil
@ -1102,12 +1069,6 @@ func (s *Server) Stop() error {
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")
if s.streamingCoord != nil {
log.Info("StreamingCoord stoping...")
s.streamingCoord.Stop()
log.Info("StreamingCoord stopped")
}
s.stopServerLoop()
s.importScheduler.Close()

View File

@ -41,8 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/streamingutil"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
@ -190,7 +188,6 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
logutil.StreamTraceLoggerInterceptor,
@ -201,7 +198,6 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}
@ -210,10 +206,6 @@ func (s *Server) startGrpcLoop() {
s.grpcServer = grpc.NewServer(grpcOpts...)
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.
if streamingutil.IsStreamingServiceEnabled() {
s.dataCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
}
coordclient.RegisterDataCoordServer(s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {

View File

@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
streamingserviceinterceptor "github.com/milvus-io/milvus/internal/util/streamingutil/service/interceptor"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/tracer"
"github.com/milvus-io/milvus/pkg/util"
@ -214,12 +215,6 @@ func (s *Server) init() error {
log.Info("Connected to tikv. Using tikv as metadata storage.")
}
err = s.startGrpc()
if err != nil {
return err
}
log.Info("grpc init done ...")
if s.newDataCoordClient != nil {
log.Info("RootCoord start to create DataCoord client")
dataCoord := s.newDataCoordClient(s.ctx)
@ -238,7 +233,17 @@ func (s *Server) init() error {
}
}
return s.rootCoord.Init()
if err := s.rootCoord.Init(); err != nil {
return err
}
log.Info("RootCoord init done ...")
err = s.startGrpc()
if err != nil {
return err
}
log.Info("grpc init done ...")
return nil
}
func (s *Server) startGrpc() error {
@ -281,6 +286,7 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
logutil.StreamTraceLoggerInterceptor,
@ -291,6 +297,7 @@ func (s *Server) startGrpcLoop() {
}
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}
@ -298,6 +305,7 @@ func (s *Server) startGrpcLoop() {
grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)
s.rootCoord.RegisterStreamingCoordGRPCService(s.grpcServer)
coordclient.RegisterRootCoordServer(s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -97,6 +98,9 @@ func (m *mockCore) SetQueryCoordClient(client types.QueryCoordClient) error {
func (m *mockCore) SetProxyCreator(func(ctx context.Context, addr string, nodeID int64) (types.ProxyClient, error)) {
}
func (m *mockCore) RegisterStreamingCoordGRPCService(server *grpc.Server) {
}
func (m *mockCore) Register() error {
return nil
}

View File

@ -10,8 +10,6 @@ import (
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
grpc "google.golang.org/grpc"
indexpb "github.com/milvus-io/milvus/internal/proto/indexpb"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
@ -2606,39 +2604,6 @@ func (_c *MockDataCoord_Register_Call) RunAndReturn(run func() error) *MockDataC
return _c
}
// RegisterStreamingCoordGRPCService provides a mock function with given fields: s
func (_m *MockDataCoord) RegisterStreamingCoordGRPCService(s *grpc.Server) {
_m.Called(s)
}
// MockDataCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService'
type MockDataCoord_RegisterStreamingCoordGRPCService_Call struct {
*mock.Call
}
// RegisterStreamingCoordGRPCService is a helper method to define mock.On call
// - s *grpc.Server
func (_e *MockDataCoord_Expecter) RegisterStreamingCoordGRPCService(s interface{}) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
return &MockDataCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", s)}
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(s *grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*grpc.Server))
})
return _c
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) Return() *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return()
return _c
}
func (_c *MockDataCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *MockDataCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return(run)
return _c
}
// ReportDataNodeTtMsgs provides a mock function with given fields: _a0, _a1
func (_m *MockDataCoord) ReportDataNodeTtMsgs(_a0 context.Context, _a1 *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -8,6 +8,8 @@ import (
commonpb "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
clientv3 "go.etcd.io/etcd/client/v3"
grpc "google.golang.org/grpc"
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -2663,6 +2665,39 @@ func (_c *RootCoord_Register_Call) RunAndReturn(run func() error) *RootCoord_Reg
return _c
}
// RegisterStreamingCoordGRPCService provides a mock function with given fields: server
func (_m *RootCoord) RegisterStreamingCoordGRPCService(server *grpc.Server) {
_m.Called(server)
}
// RootCoord_RegisterStreamingCoordGRPCService_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterStreamingCoordGRPCService'
type RootCoord_RegisterStreamingCoordGRPCService_Call struct {
*mock.Call
}
// RegisterStreamingCoordGRPCService is a helper method to define mock.On call
// - server *grpc.Server
func (_e *RootCoord_Expecter) RegisterStreamingCoordGRPCService(server interface{}) *RootCoord_RegisterStreamingCoordGRPCService_Call {
return &RootCoord_RegisterStreamingCoordGRPCService_Call{Call: _e.mock.On("RegisterStreamingCoordGRPCService", server)}
}
func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Run(run func(server *grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*grpc.Server))
})
return _c
}
func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) Return() *RootCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return()
return _c
}
func (_c *RootCoord_RegisterStreamingCoordGRPCService_Call) RunAndReturn(run func(*grpc.Server)) *RootCoord_RegisterStreamingCoordGRPCService_Call {
_c.Call.Return(run)
return _c
}
// RenameCollection provides a mock function with given fields: _a0, _a1
func (_m *RootCoord) RenameCollection(_a0 context.Context, _a1 *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -33,11 +33,13 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/metastore"
@ -47,6 +49,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
@ -84,7 +87,7 @@ var Params *paramtable.ComponentParam = paramtable.Get()
type Opt func(*Core)
type metaKVCreator func() (kv.MetaKv, error)
type metaKVCreator func() kv.MetaKv
// Core root coordinator core
type Core struct {
@ -130,6 +133,8 @@ type Core struct {
activateFunc func() error
metricsRequest *metricsinfo.MetricsRequest
streamingCoord *streamingcoord.Server
}
// --------------------- function --------------------------
@ -327,19 +332,28 @@ func (c *Core) initSession() error {
func (c *Core) initKVCreator() {
if c.metaKVCreator == nil {
if Params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
c.metaKVCreator = func() (kv.MetaKv, error) {
c.metaKVCreator = func() kv.MetaKv {
return tikv.NewTiKV(c.tikvCli, Params.TiKVCfg.MetaRootPath.GetValue(),
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
tikv.WithRequestTimeout(paramtable.Get().ServiceParam.TiKVCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
}
} else {
c.metaKVCreator = func() (kv.MetaKv, error) {
c.metaKVCreator = func() kv.MetaKv {
return etcdkv.NewEtcdKV(c.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue(),
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond))), nil
etcdkv.WithRequestTimeout(paramtable.Get().ServiceParam.EtcdCfg.RequestTimeout.GetAsDuration(time.Millisecond)))
}
}
}
}
func (c *Core) initStreamingCoord() {
c.streamingCoord = streamingcoord.NewServerBuilder().
WithETCD(c.etcdCli).
WithMetaKV(c.metaKVCreator()).
WithSession(c.session).
WithRootCoordClient(coordclient.MustGetLocalRootCoordClientFuture()).
Build()
}
func (c *Core) initMetaTable(initCtx context.Context) error {
fn := func() error {
var catalog metastore.RootCoordCatalog
@ -348,28 +362,20 @@ func (c *Core) initMetaTable(initCtx context.Context) error {
switch Params.MetaStoreCfg.MetaStoreType.GetValue() {
case util.MetaStoreTypeEtcd:
log.Ctx(initCtx).Info("Using etcd as meta storage.")
var metaKV kv.MetaKv
var ss *kvmetestore.SuffixSnapshot
var err error
if metaKV, err = c.metaKVCreator(); err != nil {
return err
}
metaKV := c.metaKVCreator()
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.EtcdCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
return err
}
catalog = &kvmetestore.Catalog{Txn: metaKV, Snapshot: ss}
case util.MetaStoreTypeTiKV:
log.Ctx(initCtx).Info("Using tikv as meta storage.")
var metaKV kv.MetaKv
var ss *kvmetestore.SuffixSnapshot
var err error
if metaKV, err = c.metaKVCreator(); err != nil {
return err
}
metaKV := c.metaKVCreator()
if ss, err = kvmetestore.NewSuffixSnapshot(metaKV, kvmetestore.SnapshotsSep, Params.TiKVCfg.MetaRootPath.GetValue(), kvmetestore.SnapshotPrefix); err != nil {
return err
}
@ -442,7 +448,6 @@ func (c *Core) initInternal() error {
log := log.Ctx(initCtx)
c.UpdateStateCode(commonpb.StateCode_Initializing)
c.initKVCreator()
if err := c.initIDAllocator(initCtx); err != nil {
return err
@ -470,6 +475,10 @@ func (c *Core) initInternal() error {
c.garbageCollector = newBgGarbageCollector(c)
c.stepExecutor = newBgStepExecutor(c.ctx)
if err := c.streamingCoord.Start(c.ctx); err != nil {
log.Info("start streaming coord failed", zap.Error(err))
return err
}
if !streamingutil.IsStreamingServiceEnabled() {
c.proxyWatcher = proxyutil.NewProxyWatcher(
c.etcdCli,
@ -523,6 +532,8 @@ func (c *Core) Init() error {
if err := c.initSession(); err != nil {
return err
}
c.initKVCreator()
c.initStreamingCoord()
if c.enableActiveStandBy {
c.activateFunc = func() error {
@ -814,6 +825,10 @@ func (c *Core) Stop() error {
c.UpdateStateCode(commonpb.StateCode_Abnormal)
c.stopExecutor()
c.stopScheduler()
if c.streamingCoord != nil {
c.streamingCoord.Stop()
}
if c.proxyWatcher != nil {
c.proxyWatcher.Stop()
}
@ -3125,3 +3140,8 @@ func (c *Core) getPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGro
}
return allGroups, nil
}
// RegisterStreamingCoordGRPCService registers the grpc service of streaming coordinator.
func (s *Core) RegisterStreamingCoordGRPCService(server *grpc.Server) {
s.streamingCoord.RegisterGRPCService(server)
}

View File

@ -44,8 +44,8 @@ type Client interface {
// NewClient creates a new client.
func NewClient(etcdCli *clientv3.Client) Client {
// StreamingCoord is deployed on DataCoord node.
role := sessionutil.GetSessionPrefixByRole(typeutil.DataCoordRole)
rb := resolver.NewSessionBuilder(etcdCli, role)
role := sessionutil.GetSessionPrefixByRole(typeutil.RootCoordRole)
rb := resolver.NewSessionExclusiveBuilder(etcdCli, role)
dialTimeout := paramtable.Get().StreamingCoordGrpcClientCfg.DialTimeout.GetAsDuration(time.Millisecond)
dialOptions := getDialOptions(rb)
conn := lazygrpc.NewConn(func(ctx context.Context) (*grpc.ClientConn, error) {
@ -53,7 +53,7 @@ func NewClient(etcdCli *clientv3.Client) Client {
defer cancel()
return grpc.DialContext(
ctx,
resolver.SessionResolverScheme+":///"+typeutil.DataCoordRole,
resolver.SessionResolverScheme+":///"+typeutil.RootCoordRole,
dialOptions...,
)
})

View File

@ -32,7 +32,7 @@ func RecoverBalancer(
}
b := &balancerImpl{
lifetime: typeutil.NewLifetime(),
logger: log.With(zap.String("policy", policy)),
logger: resource.Resource().Logger().With(log.FieldComponent("balancer"), zap.String("policy", policy)),
channelMetaManager: manager,
policy: mustGetPolicy(policy),
reqCh: make(chan *request, 5),

View File

@ -7,17 +7,18 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type ServerBuilder struct {
etcdClient *clientv3.Client
metaKV kv.MetaKv
session sessionutil.SessionInterface
etcdClient *clientv3.Client
metaKV kv.MetaKv
session sessionutil.SessionInterface
rootCoordClient *syncutil.Future[types.RootCoordClient]
}
func NewServerBuilder() *ServerBuilder {
@ -34,6 +35,11 @@ func (b *ServerBuilder) WithMetaKV(metaKV kv.MetaKv) *ServerBuilder {
return b
}
func (b *ServerBuilder) WithRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) *ServerBuilder {
b.rootCoordClient = rootCoordClient
return b
}
func (b *ServerBuilder) WithSession(session sessionutil.SessionInterface) *ServerBuilder {
b.session = session
return b
@ -43,12 +49,13 @@ func (s *ServerBuilder) Build() *Server {
resource.Init(
resource.OptETCD(s.etcdClient),
resource.OptStreamingCatalog(streamingcoord.NewCataLog(s.metaKV)),
resource.OptRootCoordClient(s.rootCoordClient),
)
balancer := syncutil.NewFuture[balancer.Balancer]()
return &Server{
session: s.session,
componentStateService: componentutil.NewComponentStateService(typeutil.StreamingCoordRole),
assignmentService: service.NewAssignmentService(balancer),
balancer: balancer,
logger: resource.Resource().Logger().With(log.FieldComponent("server")),
session: s.session,
assignmentService: service.NewAssignmentService(balancer),
balancer: balancer,
}
}

View File

@ -7,6 +7,10 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/streamingnode/client/manager"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var r *resourceImpl // singleton resource instance
@ -21,6 +25,13 @@ func OptETCD(etcd *clientv3.Client) optResourceInit {
}
}
// OptRootCoordClient provides the root coordinator client to the resource.
func OptRootCoordClient(rootCoordClient *syncutil.Future[types.RootCoordClient]) optResourceInit {
return func(r *resourceImpl) {
r.rootCoordClient = rootCoordClient
}
}
// OptStreamingCatalog provides streaming catalog to the resource.
func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceInit {
return func(r *resourceImpl) {
@ -31,10 +42,13 @@ func OptStreamingCatalog(catalog metastore.StreamingCoordCataLog) optResourceIni
// Init initializes the singleton of resources.
// Should be call when streaming node startup.
func Init(opts ...optResourceInit) {
newR := &resourceImpl{}
newR := &resourceImpl{
logger: log.With(log.FieldModule(typeutil.StreamingCoordRole)),
}
for _, opt := range opts {
opt(newR)
}
assertNotNil(newR.RootCoordClient())
assertNotNil(newR.ETCD())
assertNotNil(newR.StreamingCatalog())
newR.streamingNodeManagerClient = manager.NewManagerClient(newR.etcdClient)
@ -50,9 +64,16 @@ func Resource() *resourceImpl {
// resourceImpl is a basic resource dependency for streamingnode server.
// All utility on it is concurrent-safe and singleton.
type resourceImpl struct {
rootCoordClient *syncutil.Future[types.RootCoordClient]
etcdClient *clientv3.Client
streamingCatalog metastore.StreamingCoordCataLog
streamingNodeManagerClient manager.ManagerClient
logger *log.MLogger
}
// RootCoordClient returns the root coordinator client.
func (r *resourceImpl) RootCoordClient() *syncutil.Future[types.RootCoordClient] {
return r.rootCoordClient
}
// StreamingCatalog returns the StreamingCatalog client.
@ -70,6 +91,10 @@ func (r *resourceImpl) StreamingNodeManagerClient() manager.ManagerClient {
return r.streamingNodeManagerClient
}
func (r *resourceImpl) Logger() *log.MLogger {
return r.logger
}
// assertNotNil panics if the resource is nil.
func assertNotNil(v interface{}) {
iv := reflect.ValueOf(v)

View File

@ -5,6 +5,7 @@ package resource
import (
"github.com/milvus-io/milvus/internal/streamingnode/client/manager"
"github.com/milvus-io/milvus/pkg/log"
)
// OptStreamingManagerClient provides streaming manager client to the resource.
@ -16,7 +17,9 @@ func OptStreamingManagerClient(c manager.ManagerClient) optResourceInit {
// InitForTest initializes the singleton of resources for test.
func InitForTest(opts ...optResourceInit) {
r = &resourceImpl{}
r = &resourceImpl{
logger: log.With(),
}
for _, opt := range opts {
opt(r)
}

View File

@ -9,72 +9,76 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
_ "github.com/milvus-io/milvus/internal/streamingcoord/server/balancer/policy" // register the balancer policy
"github.com/milvus-io/milvus/internal/streamingcoord/server/service"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
// Server is the streamingcoord server.
type Server struct {
logger *log.MLogger
// session of current server.
session sessionutil.SessionInterface
// service level variables.
assignmentService service.AssignmentService
componentStateService *componentutil.ComponentStateService // state.
assignmentService service.AssignmentService
// basic component variables can be used at service level.
balancer *syncutil.Future[balancer.Balancer]
}
// Init initializes the streamingcoord server.
func (s *Server) Init(ctx context.Context) (err error) {
log.Info("init streamingcoord server...")
// Init all underlying component of streamingcoord server.
func (s *Server) Start(ctx context.Context) (err error) {
s.logger.Info("init streamingcoord...")
if err := s.initBasicComponent(ctx); err != nil {
log.Error("init basic component of streamingcoord server failed", zap.Error(err))
s.logger.Warn("init basic component of streamingcoord failed", zap.Error(err))
return err
}
// Init all grpc service of streamingcoord server.
s.componentStateService.OnInitialized(s.session.GetServerID())
log.Info("streamingcoord server initialized")
s.logger.Info("streamingcoord initialized")
return nil
}
// initBasicComponent initialize all underlying dependency for streamingcoord.
func (s *Server) initBasicComponent(ctx context.Context) error {
// Init balancer
var err error
// Read new incoming topics from configuration, and register it into balancer.
newIncomingTopics := util.GetAllTopicsFromConfiguration()
balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
if err != nil {
return err
func (s *Server) initBasicComponent(ctx context.Context) (err error) {
if streamingutil.IsStreamingServiceEnabled() {
fBalancer := conc.Go(func() (struct{}, error) {
s.logger.Info("start recovery balancer...")
// Read new incoming topics from configuration, and register it into balancer.
newIncomingTopics := util.GetAllTopicsFromConfiguration()
balancer, err := balancer.RecoverBalancer(ctx, "pchannel_count_fair", newIncomingTopics.Collect()...)
if err != nil {
s.logger.Warn("recover balancer failed", zap.Error(err))
return struct{}{}, err
}
s.balancer.Set(balancer)
s.logger.Info("recover balancer done")
return struct{}{}, nil
})
return conc.AwaitAll(fBalancer)
}
s.balancer.Set(balancer)
return err
return nil
}
// registerGRPCService register all grpc service to grpc server.
// RegisterGRPCService register all grpc service to grpc server.
func (s *Server) RegisterGRPCService(grpcServer *grpc.Server) {
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
streamingpb.RegisterStreamingCoordStateServiceServer(grpcServer, s.componentStateService)
if streamingutil.IsStreamingServiceEnabled() {
streamingpb.RegisterStreamingCoordAssignmentServiceServer(grpcServer, s.assignmentService)
}
}
// Start starts the streamingcoord server.
func (s *Server) Start() {
// Just do nothing now.
log.Info("start streamingcoord server")
}
// Stop stops the streamingcoord server.
// Close closes the streamingcoord server.
func (s *Server) Stop() {
s.componentStateService.OnStopping()
log.Info("close balancer...")
s.balancer.Get().Close()
log.Info("streamingcoord server stopped")
if s.balancer.Ready() {
s.logger.Info("start close balancer...")
s.balancer.Get().Close()
} else {
s.logger.Info("balancer not ready, skip close")
}
s.logger.Info("streamingcoord server stopped")
}

View File

@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/streamingcoord/server/balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -27,7 +28,7 @@ func NewAssignmentDiscoverServer(
streamServer: discoverGrpcServerHelper{
streamServer,
},
logger: log.With(),
logger: resource.Resource().Logger().With(log.FieldComponent("assignment-discover-server")),
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus/internal/mocks/streamingcoord/server/mock_balancer"
"github.com/milvus-io/milvus/internal/streamingcoord/server/resource"
"github.com/milvus-io/milvus/pkg/mocks/streaming/proto/mock_streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
@ -15,6 +16,7 @@ import (
)
func TestAssignmentDiscover(t *testing.T) {
resource.InitForTest()
b := mock_balancer.NewMockBalancer(t)
b.EXPECT().WatchChannelAssignments(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cb func(typeutil.VersionInt64Pair, []types.PChannelInfoAssigned) error) error {
versions := []typeutil.VersionInt64Pair{

View File

@ -8,9 +8,11 @@ import (
"github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
func TestServer(t *testing.T) {
@ -27,16 +29,15 @@ func TestServer(t *testing.T) {
b := NewServerBuilder()
metaKV := etcdkv.NewEtcdKV(c, "test")
s := sessionutil.NewMockSession(t)
s.EXPECT().GetServerID().Return(1)
f := syncutil.NewFuture[types.RootCoordClient]()
newServer := b.WithETCD(c).
WithMetaKV(metaKV).
WithSession(s).
WithRootCoordClient(f).
Build()
ctx := context.Background()
err = newServer.Init(ctx)
err = newServer.Start(ctx)
assert.NoError(t, err)
newServer.Start()
newServer.Stop()
}

View File

@ -121,8 +121,6 @@ type DataCoord interface {
type DataCoordComponent interface {
DataCoord
RegisterStreamingCoordGRPCService(s *grpc.Server)
SetAddress(address string)
// SetEtcdClient set EtcdClient for DataCoord
// `etcdClient` is a client of etcd
@ -213,6 +211,8 @@ type RootCoordComponent interface {
// GetMetrics notifies RootCoordComponent to collect metrics for specified component
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
RegisterStreamingCoordGRPCService(server *grpc.Server)
}
// ProxyClient is the client interface for proxy server

View File

@ -17,12 +17,13 @@ import (
)
// NewSessionDiscoverer returns a new Discoverer for the milvus session registration.
func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersion string) Discoverer {
func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, exclusive bool, minimumVersion string) Discoverer {
return &sessionDiscoverer{
etcdCli: etcdCli,
prefix: prefix,
exclusive: exclusive,
versionRange: semver.MustParseRange(">=" + minimumVersion),
logger: log.With(zap.String("prefix", prefix), zap.String("expectedVersion", minimumVersion)),
logger: log.With(zap.String("prefix", prefix), zap.Bool("exclusive", exclusive), zap.String("expectedVersion", minimumVersion)),
revision: 0,
peerSessions: make(map[string]*sessionutil.SessionRaw),
}
@ -32,6 +33,7 @@ func NewSessionDiscoverer(etcdCli *clientv3.Client, prefix string, minimumVersio
type sessionDiscoverer struct {
etcdCli *clientv3.Client
prefix string
exclusive bool // if exclusive, only one session is allowed, not use the prefix, only use the role directly.
logger *log.MLogger
versionRange semver.Range
revision int64
@ -64,12 +66,15 @@ func (sw *sessionDiscoverer) Discover(ctx context.Context, cb func(VersionedStat
// watch performs the watch on etcd.
func (sw *sessionDiscoverer) watch(ctx context.Context, cb func(VersionedState) error) error {
opts := []clientv3.OpOption{clientv3.WithRev(sw.revision + 1)}
if !sw.exclusive {
opts = append(opts, clientv3.WithPrefix())
}
// start a watcher at background.
eventCh := sw.etcdCli.Watch(
ctx,
sw.prefix,
clientv3.WithPrefix(),
clientv3.WithRev(sw.revision+1),
opts...,
)
for {
@ -124,7 +129,11 @@ func (sw *sessionDiscoverer) handleETCDEvent(resp clientv3.WatchResponse) error
// initDiscover initializes the discoverer if needed.
func (sw *sessionDiscoverer) initDiscover(ctx context.Context) error {
resp, err := sw.etcdCli.Get(ctx, sw.prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
opts := []clientv3.OpOption{clientv3.WithSerializable()}
if !sw.exclusive {
opts = append(opts, clientv3.WithPrefix())
}
resp, err := sw.etcdCli.Get(ctx, sw.prefix, opts...)
if err != nil {
return err
}

View File

@ -25,7 +25,7 @@ func TestSessionDiscoverer(t *testing.T) {
etcdClient, err := etcd.GetEmbedEtcdClient()
assert.NoError(t, err)
targetVersion := "0.1.0"
d := NewSessionDiscoverer(etcdClient, "session/", targetVersion)
d := NewSessionDiscoverer(etcdClient, "session/", false, targetVersion)
s := d.NewVersionedState()
assert.True(t, s.Version.EQ(typeutil.VersionInt64(-1)))
@ -95,7 +95,7 @@ func TestSessionDiscoverer(t *testing.T) {
assert.ErrorIs(t, err, io.EOF)
// Do a init discover here.
d = NewSessionDiscoverer(etcdClient, "session/", targetVersion)
d = NewSessionDiscoverer(etcdClient, "session/", false, targetVersion)
err = d.Discover(ctx, func(state VersionedState) error {
// balance attributes
sessions := state.Sessions()

View File

@ -28,9 +28,15 @@ func NewChannelAssignmentBuilder(w types.AssignmentDiscoverWatcher) Builder {
}
// NewSessionBuilder creates a new resolver builder.
// Multiple sessions are allowed, use the role as prefix.
func NewSessionBuilder(c *clientv3.Client, role string) Builder {
// TODO: use 2.5.0 after 2.5.0 released.
return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, "2.4.0"))
return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, false, "2.4.0"))
}
// NewSessionExclusiveBuilder creates a new resolver builder with exclusive.
// Only one session is allowed, not use the prefix, only use the role directly.
func NewSessionExclusiveBuilder(c *clientv3.Client, role string) Builder {
return newBuilder(SessionResolverScheme, discoverer.NewSessionDiscoverer(c, role, true, "2.4.0"))
}
// newBuilder creates a new resolver builder.

18
pkg/log/fields.go Normal file
View File

@ -0,0 +1,18 @@
package log
import "go.uber.org/zap"
const (
FieldNameModule = "module"
FieldNameComponent = "component"
)
// FieldModule returns a zap field with the module name.
func FieldModule(module string) zap.Field {
return zap.String(FieldNameModule, module)
}
// FieldComponent returns a zap field with the component name.
func FieldComponent(component string) zap.Field {
return zap.String(FieldNameComponent, component)
}

View File

@ -143,7 +143,7 @@ func WithReqID(ctx context.Context, reqID int64) context.Context {
// WithModule adds given module field to the logger in ctx
func WithModule(ctx context.Context, module string) context.Context {
fields := []zap.Field{zap.String("module", module)}
fields := []zap.Field{zap.String(FieldNameModule, module)}
return WithFields(ctx, fields...)
}