mirror of https://github.com/milvus-io/milvus.git
Refine startup workflow of proxy (#13459)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/13459/merge
parent
74cf787c1d
commit
8a488f1f20
|
@ -48,11 +48,6 @@ import (
|
|||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
const (
|
||||
// GRPCMaxMagSize is the max size of grpc message.
|
||||
GRPCMaxMagSize = 2 << 30
|
||||
)
|
||||
|
||||
var Params paramtable.GrpcServerConfig
|
||||
|
||||
// Server is the Proxy Server
|
||||
|
@ -66,7 +61,7 @@ type Server struct {
|
|||
|
||||
rootCoordClient types.RootCoord
|
||||
dataCoordClient types.DataCoord
|
||||
queryCooedClient types.QueryCoord
|
||||
queryCoordClient types.QueryCoord
|
||||
indexCoordClient types.IndexCoord
|
||||
|
||||
tracer opentracing.Tracer
|
||||
|
@ -90,7 +85,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
|
|||
}
|
||||
|
||||
func (s *Server) startGrpcLoop(grpcPort int) {
|
||||
|
||||
defer s.wg.Done()
|
||||
|
||||
var kaep = keepalive.EnforcementPolicy{
|
||||
|
@ -103,13 +97,14 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
|
||||
}
|
||||
|
||||
log.Debug("proxy", zap.Int("network port", grpcPort))
|
||||
log.Debug("Proxy server listen on tcp", zap.Int("port", grpcPort))
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
|
||||
if err != nil {
|
||||
log.Warn("proxy", zap.String("Server:failed to listen:", err.Error()))
|
||||
log.Warn("Proxy server failed to listen on", zap.Error(err), zap.Int("port", grpcPort))
|
||||
s.grpcErrChan <- err
|
||||
return
|
||||
}
|
||||
log.Debug("Proxy server already listen on tcp", zap.Int("port", grpcPort))
|
||||
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer cancel()
|
||||
|
@ -120,149 +115,212 @@ func (s *Server) startGrpcLoop(grpcPort int) {
|
|||
grpc.KeepaliveParams(kasp),
|
||||
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
|
||||
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
|
||||
grpc.MaxRecvMsgSize(GRPCMaxMagSize),
|
||||
grpc.UnaryInterceptor(ot.UnaryServerInterceptor(opts...)),
|
||||
grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...)))
|
||||
proxypb.RegisterProxyServer(s.grpcServer, s)
|
||||
milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
|
||||
|
||||
log.Debug("create Proxy grpc server",
|
||||
zap.Any("enforcement policy", kaep),
|
||||
zap.Any("server parameters", kasp))
|
||||
|
||||
log.Debug("waiting for Proxy grpc server to be ready")
|
||||
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
|
||||
|
||||
log.Debug("Proxy grpc server has been ready, serve grpc requests on listen")
|
||||
if err := s.grpcServer.Serve(lis); err != nil {
|
||||
log.Warn("failed to serve on Proxy's listener", zap.Error(err))
|
||||
s.grpcErrChan <- err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Start start the Proxy Server
|
||||
func (s *Server) Run() error {
|
||||
|
||||
log.Debug("init Proxy server")
|
||||
if err := s.init(); err != nil {
|
||||
log.Warn("init Proxy server failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy node init done ...")
|
||||
log.Debug("init Proxy server done")
|
||||
|
||||
log.Debug("start Proxy server")
|
||||
if err := s.start(); err != nil {
|
||||
log.Warn("start Proxy server failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy node start done ...")
|
||||
log.Debug("start Proxy server done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) init() error {
|
||||
var err error
|
||||
Params.InitOnce(typeutil.ProxyRole)
|
||||
|
||||
proxy.Params.InitOnce()
|
||||
log.Debug("init params done ...")
|
||||
|
||||
// NetworkPort & IP don't matter here, NetworkAddress matters
|
||||
proxy.Params.ProxyCfg.NetworkPort = Params.Port
|
||||
proxy.Params.ProxyCfg.IP = Params.IP
|
||||
log.Debug("init Proxy service's parameter table done")
|
||||
|
||||
proxy.Params.ProxyCfg.NetworkAddress = Params.GetAddress()
|
||||
if !paramtable.CheckPortAvailable(Params.Port) {
|
||||
// as the entry of Milvus, we'd better not to use another port
|
||||
return fmt.Errorf("port %d already in use", Params.Port)
|
||||
}
|
||||
|
||||
closer := trace.InitTracing(fmt.Sprintf("proxy ip: %s, port: %d", Params.IP, Params.Port))
|
||||
proxy.Params.InitOnce()
|
||||
proxy.Params.ProxyCfg.NetworkAddress = Params.GetAddress()
|
||||
log.Debug("init Proxy's parameter table done", zap.String("address", Params.GetAddress()))
|
||||
|
||||
serviceName := fmt.Sprintf("Proxy ip: %s, port: %d", Params.IP, Params.Port)
|
||||
closer := trace.InitTracing(serviceName)
|
||||
s.closer = closer
|
||||
|
||||
log.Debug("proxy", zap.String("proxy host", Params.IP))
|
||||
log.Debug("proxy", zap.Int("proxy port", Params.Port))
|
||||
log.Debug("proxy", zap.String("proxy address", Params.GetAddress()))
|
||||
log.Debug("init Proxy's tracer done", zap.String("service name", serviceName))
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.startGrpcLoop(Params.Port)
|
||||
// wait for grpc server loop start
|
||||
err = <-s.grpcErrChan
|
||||
log.Debug("create grpc server ...")
|
||||
if err != nil {
|
||||
log.Debug("waiting for grpc server of Proxy to be started")
|
||||
if err := <-s.grpcErrChan; err != nil {
|
||||
log.Warn("failed to start Proxy's grpc server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("grpc server of Proxy has been started")
|
||||
|
||||
if s.rootCoordClient == nil {
|
||||
var err error
|
||||
log.Debug("create RootCoord client for Proxy")
|
||||
s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.ProxyCfg.MetaRootPath, proxy.Params.ProxyCfg.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Debug("Proxy new rootCoordClient failed ", zap.Error(err))
|
||||
log.Warn("failed to create RootCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("create RootCoord client for Proxy done")
|
||||
}
|
||||
err = s.rootCoordClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("Proxy new rootCoordClient Init ", zap.Error(err))
|
||||
|
||||
log.Debug("init RootCoord client for Proxy")
|
||||
if err := s.rootCoordClient.Init(); err != nil {
|
||||
log.Warn("failed to init RootCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = funcutil.WaitForComponentHealthy(s.ctx, s.rootCoordClient, "RootCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("Proxy WaitForComponentHealthy RootCoord failed ", zap.Error(err))
|
||||
panic(err)
|
||||
log.Debug("init RootCoord client for Proxy done")
|
||||
|
||||
log.Debug("Proxy wait for RootCoord to be healthy")
|
||||
if err := funcutil.WaitForComponentHealthy(s.ctx, s.rootCoordClient, "RootCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Warn("Proxy failed to wait for RootCoord to be healthy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy wait for RootCoord to be healthy done")
|
||||
|
||||
log.Debug("set RootCoord client for Proxy")
|
||||
s.proxy.SetRootCoordClient(s.rootCoordClient)
|
||||
log.Debug("set rootcoord client ...")
|
||||
log.Debug("set RootCoord client for Proxy done")
|
||||
|
||||
if s.dataCoordClient == nil {
|
||||
var err error
|
||||
log.Debug("create DataCoord client for Proxy")
|
||||
s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.ProxyCfg.MetaRootPath, proxy.Params.ProxyCfg.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Debug("Proxy new dataCoordClient failed ", zap.Error(err))
|
||||
log.Warn("failed to create DataCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = s.dataCoordClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("Proxy dataCoordClient init failed ", zap.Error(err))
|
||||
return err
|
||||
log.Debug("create DataCoord client for Proxy done")
|
||||
}
|
||||
|
||||
log.Debug("init DataCoord client for Proxy")
|
||||
if err := s.dataCoordClient.Init(); err != nil {
|
||||
log.Warn("failed to init DataCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("init DataCoord client for Proxy done")
|
||||
|
||||
log.Debug("Proxy wait for DataCoord to be healthy")
|
||||
if err := funcutil.WaitForComponentHealthy(s.ctx, s.dataCoordClient, "DataCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Warn("Proxy failed to wait for DataCoord to be healthy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy wait for DataCoord to be healthy done")
|
||||
|
||||
log.Debug("set DataCoord client for Proxy")
|
||||
s.proxy.SetDataCoordClient(s.dataCoordClient)
|
||||
log.Debug("set data coordinator address ...")
|
||||
log.Debug("set DataCoord client for Proxy done")
|
||||
|
||||
if s.indexCoordClient == nil {
|
||||
var err error
|
||||
log.Debug("create IndexCoord client for Proxy")
|
||||
s.indexCoordClient, err = icc.NewClient(s.ctx, proxy.Params.ProxyCfg.MetaRootPath, proxy.Params.ProxyCfg.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Debug("Proxy new indexCoordClient failed ", zap.Error(err))
|
||||
log.Warn("failed to create IndexCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = s.indexCoordClient.Init()
|
||||
if err != nil {
|
||||
log.Debug("Proxy indexCoordClient init failed ", zap.Error(err))
|
||||
return err
|
||||
log.Debug("create IndexCoord client for Proxy done")
|
||||
}
|
||||
|
||||
log.Debug("init IndexCoord client for Proxy")
|
||||
if err := s.indexCoordClient.Init(); err != nil {
|
||||
log.Warn("failed to init IndexCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("init IndexCoord client for Proxy done")
|
||||
|
||||
log.Debug("Proxy wait for IndexCoord to be healthy")
|
||||
if err := funcutil.WaitForComponentHealthy(s.ctx, s.indexCoordClient, "IndexCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Warn("Proxy failed to wait for IndexCoord to be healthy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy wait for IndexCoord to be healthy done")
|
||||
|
||||
log.Debug("set IndexCoord client for Proxy")
|
||||
s.proxy.SetIndexCoordClient(s.indexCoordClient)
|
||||
log.Debug("set index coordinator client ...")
|
||||
log.Debug("set IndexCoord client for Proxy done")
|
||||
|
||||
if s.queryCooedClient == nil {
|
||||
s.queryCooedClient, err = qcc.NewClient(s.ctx, proxy.Params.ProxyCfg.MetaRootPath, proxy.Params.ProxyCfg.EtcdEndpoints)
|
||||
if s.queryCoordClient == nil {
|
||||
var err error
|
||||
log.Debug("create QueryCoord client for Proxy")
|
||||
s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.ProxyCfg.MetaRootPath, proxy.Params.ProxyCfg.EtcdEndpoints)
|
||||
if err != nil {
|
||||
log.Warn("failed to create QueryCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("create QueryCoord client for Proxy done")
|
||||
}
|
||||
err = s.queryCooedClient.Init()
|
||||
if err != nil {
|
||||
|
||||
log.Debug("init QueryCoord client for Proxy")
|
||||
if err := s.queryCoordClient.Init(); err != nil {
|
||||
log.Warn("failed to init QueryCoord client for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
s.proxy.SetQueryCoordClient(s.queryCooedClient)
|
||||
log.Debug("set query coordinator client ...")
|
||||
log.Debug("init QueryCoord client for Proxy done")
|
||||
|
||||
log.Debug("Proxy wait for QueryCoord to be healthy")
|
||||
if err := funcutil.WaitForComponentHealthy(s.ctx, s.queryCoordClient, "QueryCoord", 1000000, time.Millisecond*200); err != nil {
|
||||
log.Warn("Proxy failed to wait for QueryCoord to be healthy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy wait for QueryCoord to be healthy done")
|
||||
|
||||
log.Debug("set QueryCoord client for Proxy")
|
||||
s.proxy.SetQueryCoordClient(s.queryCoordClient)
|
||||
log.Debug("set QueryCoord client for Proxy done")
|
||||
|
||||
log.Debug(fmt.Sprintf("update Proxy's state to %s", internalpb.StateCode_Initializing.String()))
|
||||
s.proxy.UpdateStateCode(internalpb.StateCode_Initializing)
|
||||
log.Debug("proxy", zap.Any("state of proxy", internalpb.StateCode_Initializing))
|
||||
|
||||
log.Debug("init Proxy")
|
||||
if err := s.proxy.Init(); err != nil {
|
||||
log.Debug("proxy", zap.String("proxy init error", err.Error()))
|
||||
log.Warn("failed to init Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("init Proxy done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) start() error {
|
||||
err := s.proxy.Start()
|
||||
if err != nil {
|
||||
log.Error("Proxy start failed", zap.Error(err))
|
||||
}
|
||||
err = s.proxy.Register()
|
||||
if err != nil {
|
||||
log.Error("Proxy register service failed ", zap.Error(err))
|
||||
if err := s.proxy.Start(); err != nil {
|
||||
log.Warn("failed to start Proxy server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.proxy.Register(); err != nil {
|
||||
log.Warn("failed to register Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
|
@ -250,7 +252,16 @@ func (m *MockQueryCoord) SetDataCoord(types.DataCoord) error {
|
|||
}
|
||||
|
||||
func (m *MockQueryCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
return nil, nil
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
NodeID: int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
|
||||
Role: "MockQueryCoord",
|
||||
StateCode: internalpb.StateCode_Healthy,
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
SubcomponentStates: nil,
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *MockQueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
|
@ -631,7 +642,7 @@ func Test_NewServer(t *testing.T) {
|
|||
server.proxy = &MockProxy{}
|
||||
server.rootCoordClient = &MockRootCoord{}
|
||||
server.indexCoordClient = &MockIndexCoord{}
|
||||
server.queryCooedClient = &MockQueryCoord{}
|
||||
server.queryCoordClient = &MockQueryCoord{}
|
||||
server.dataCoordClient = &MockDataCoord{}
|
||||
|
||||
t.Run("Run", func(t *testing.T) {
|
||||
|
|
|
@ -34,7 +34,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
|
@ -137,57 +136,31 @@ func (node *Proxy) initSession() error {
|
|||
|
||||
// Init initialize proxy.
|
||||
func (node *Proxy) Init() error {
|
||||
err := node.initSession()
|
||||
if err != nil {
|
||||
log.Error("Proxy init session failed", zap.Error(err))
|
||||
log.Debug("init session for Proxy")
|
||||
if err := node.initSession(); err != nil {
|
||||
log.Warn("failed to init Proxy's session", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("init session for Proxy done")
|
||||
|
||||
log.Debug("refresh configuration of Proxy")
|
||||
Params.ProxyCfg.Refresh()
|
||||
// wait for datacoord state changed to Healthy
|
||||
if node.dataCoord != nil {
|
||||
log.Debug("Proxy wait for DataCoord ready")
|
||||
err := funcutil.WaitForComponentHealthy(node.ctx, node.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("Proxy wait for DataCoord ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy DataCoord is ready")
|
||||
}
|
||||
|
||||
// wait for queryCoord state changed to Healthy
|
||||
if node.queryCoord != nil {
|
||||
log.Debug("Proxy wait for QueryCoord ready")
|
||||
err := funcutil.WaitForComponentHealthy(node.ctx, node.queryCoord, "QueryCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("Proxy wait for QueryCoord ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy QueryCoord is ready")
|
||||
}
|
||||
|
||||
// wait for indexcoord state changed to Healthy
|
||||
if node.indexCoord != nil {
|
||||
log.Debug("Proxy wait for IndexCoord ready")
|
||||
err := funcutil.WaitForComponentHealthy(node.ctx, node.indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
|
||||
if err != nil {
|
||||
log.Debug("Proxy wait for IndexCoord ready failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("Proxy IndexCoord is ready")
|
||||
}
|
||||
log.Debug("refresh configuration of Proxy done")
|
||||
|
||||
if node.queryCoord != nil {
|
||||
log.Debug("create query channel for Proxy")
|
||||
resp, err := node.queryCoord.CreateQueryChannel(node.ctx, &querypb.CreateQueryChannelRequest{})
|
||||
if err != nil {
|
||||
log.Debug("Proxy CreateQueryChannel failed", zap.Error(err))
|
||||
log.Warn("failed to create query channel for Proxy", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Debug("Proxy CreateQueryChannel failed", zap.String("reason", resp.Status.Reason))
|
||||
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
log.Warn("failed to create query channel for Proxy",
|
||||
zap.String("error_code", resp.Status.ErrorCode.String()),
|
||||
zap.String("reason", resp.Status.Reason))
|
||||
return errors.New(resp.Status.Reason)
|
||||
}
|
||||
log.Debug("Proxy CreateQueryChannel success")
|
||||
|
||||
// TODO SearchResultChannelNames and RetrieveResultChannelNames should not be part in the Param table
|
||||
// we should maintain a separate map for search result
|
||||
|
@ -195,49 +168,85 @@ func (node *Proxy) Init() error {
|
|||
Params.ProxyCfg.RetrieveResultChannelNames = []string{resp.QueryResultChannel}
|
||||
log.Debug("Proxy CreateQueryChannel success", zap.Any("SearchResultChannelNames", Params.ProxyCfg.SearchResultChannelNames))
|
||||
log.Debug("Proxy CreateQueryChannel success", zap.Any("RetrieveResultChannelNames", Params.ProxyCfg.RetrieveResultChannelNames))
|
||||
log.Debug("create query channel for Proxy done", zap.String("QueryResultChannel", resp.QueryResultChannel))
|
||||
}
|
||||
|
||||
m := map[string]interface{}{
|
||||
"PulsarAddress": Params.ProxyCfg.PulsarAddress,
|
||||
"PulsarBufSize": 1024}
|
||||
err = node.msFactory.SetParams(m)
|
||||
if err != nil {
|
||||
log.Debug("set parameters for ms factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m))
|
||||
if err := node.msFactory.SetParams(m); err != nil {
|
||||
log.Warn("failed to set parameters for ms factory",
|
||||
zap.Error(err),
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.Any("parameters", m))
|
||||
return err
|
||||
}
|
||||
log.Debug("set parameters for ms factory done", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", m))
|
||||
|
||||
log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.ProxyID)
|
||||
if err != nil {
|
||||
log.Warn("failed to create id allocator",
|
||||
zap.Error(err),
|
||||
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
return err
|
||||
}
|
||||
|
||||
node.idAllocator = idAllocator
|
||||
log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
|
||||
log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
tsoAllocator, err := newTimestampAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.ProxyID)
|
||||
if err != nil {
|
||||
log.Warn("failed to create timestamp allocator",
|
||||
zap.Error(err),
|
||||
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
return err
|
||||
}
|
||||
node.tsoAllocator = tsoAllocator
|
||||
log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
|
||||
log.Debug("create segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
log.Warn("failed to create segment id assigner",
|
||||
zap.Error(err),
|
||||
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
return err
|
||||
}
|
||||
node.segAssigner = segAssigner
|
||||
node.segAssigner.PeerID = Params.ProxyCfg.ProxyID
|
||||
log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.ProxyID))
|
||||
|
||||
log.Debug("create channels manager", zap.String("role", typeutil.ProxyRole))
|
||||
dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord)
|
||||
dqlChannelsFunc := getDqlChannelsFunc(node.ctx, node.session.ServerID, node.queryCoord)
|
||||
chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, dqlChannelsFunc, nil, node.msFactory)
|
||||
node.chMgr = chMgr
|
||||
log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("create task scheduler", zap.String("role", typeutil.ProxyRole))
|
||||
node.sched, err = newTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.msFactory)
|
||||
if err != nil {
|
||||
log.Warn("failed to create task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("create channels time ticker", zap.String("role", typeutil.ProxyRole))
|
||||
node.chTicker = newChannelsTimeTicker(node.ctx, channelMgrTickerInterval, []string{}, node.sched.getPChanStatistics, tsoAllocator)
|
||||
log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("create metrics cache manager", zap.String("role", typeutil.ProxyRole))
|
||||
node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
|
||||
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("init meta cache", zap.String("role", typeutil.ProxyRole))
|
||||
if err := InitMetaCache(node.rootCoord); err != nil {
|
||||
log.Warn("failed to init meta cache", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -315,32 +324,33 @@ func (node *Proxy) sendChannelsTimeTickLoop() {
|
|||
|
||||
// Start starts a proxy node.
|
||||
func (node *Proxy) Start() error {
|
||||
err := InitMetaCache(node.rootCoord)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("init global meta cache ...")
|
||||
|
||||
log.Debug("start task scheduler", zap.String("role", typeutil.ProxyRole))
|
||||
if err := node.sched.Start(); err != nil {
|
||||
log.Warn("failed to start task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("start scheduler ...")
|
||||
log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("start id allocator", zap.String("role", typeutil.ProxyRole))
|
||||
if err := node.idAllocator.Start(); err != nil {
|
||||
log.Warn("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("start id allocator ...")
|
||||
log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
log.Debug("start segment id assigner", zap.String("role", typeutil.ProxyRole))
|
||||
if err := node.segAssigner.Start(); err != nil {
|
||||
log.Warn("failed to start segment id assigner", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("start seg assigner ...")
|
||||
log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
err = node.chTicker.start()
|
||||
if err != nil {
|
||||
log.Debug("start channels time ticker", zap.String("role", typeutil.ProxyRole))
|
||||
if err := node.chTicker.start(); err != nil {
|
||||
log.Warn("failed to start channels time ticker", zap.Error(err), zap.String("role", typeutil.ProxyRole))
|
||||
return err
|
||||
}
|
||||
log.Debug("start channelsTimeTicker")
|
||||
log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole))
|
||||
|
||||
node.sendChannelsTimeTickLoop()
|
||||
|
||||
|
@ -349,11 +359,12 @@ func (node *Proxy) Start() error {
|
|||
cb()
|
||||
}
|
||||
|
||||
Params.ProxyCfg.CreatedTime = time.Now()
|
||||
Params.ProxyCfg.UpdatedTime = time.Now()
|
||||
now := time.Now()
|
||||
Params.ProxyCfg.CreatedTime = now
|
||||
Params.ProxyCfg.UpdatedTime = now
|
||||
|
||||
log.Debug("update state code", zap.String("role", typeutil.ProxyRole), zap.String("State", internalpb.StateCode_Healthy.String()))
|
||||
node.UpdateStateCode(internalpb.StateCode_Healthy)
|
||||
log.Debug("Proxy", zap.Any("State", node.stateCode.Load()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue