Register the service when the component state is healthy (#13248)

Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>
pull/13428/head
cai.zhang 2021-12-15 11:47:10 +08:00 committed by GitHub
parent 6ef066d858
commit 9f23fc7f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 322 additions and 193 deletions

View File

@ -211,20 +211,33 @@ func (s *Server) QuitSignal() <-chan struct{} {
// Register register data service at etcd
func (s *Server) Register() error {
s.session.Register()
go s.session.LivenessCheck(s.serverLoopCtx, func() {
log.Error("DataCoord disconnected from etcd, process will exit", zap.Int64("ServerID", s.session.ServerID))
if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
return nil
}
func (s *Server) initSession() error {
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
if s.session == nil {
return errors.New("failed to initialize session")
}
s.session.Init(typeutil.DataCoordRole, Params.Address, true)
Params.NodeID = s.session.ServerID
Params.SetLogger(typeutil.UniqueID(-1))
Params.SetLogger(Params.NodeID)
return nil
}
// Init change server state to Initializing
func (s *Server) Init() error {
atomic.StoreInt64(&s.isServing, ServerStateInitializing)
return nil
return s.initSession()
}
// Start initialize `Server` members and start loops, follow steps are taken:
@ -404,14 +417,6 @@ func (s *Server) startServerLoop() {
s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx)
s.garbageCollector.start()
go s.session.LivenessCheck(s.serverLoopCtx, func() {
log.Error("DataCoord disconnected from etcd, process will exit", zap.Int64("ServerID", s.session.ServerID))
if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
}
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream

View File

@ -2227,12 +2227,12 @@ func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Se
return newMockRootCoordService(), nil
}
assert.Nil(t, err)
err = svr.Register()
assert.Nil(t, err)
err = svr.Init()
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
err = svr.Register()
assert.Nil(t, err)
return svr
}

View File

@ -174,13 +174,8 @@ func (node *DataNode) SetNodeID(id UniqueID) {
// Register register datanode to etcd
func (node *DataNode) Register() error {
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = node.session.ServerID
node.NodeID = node.session.ServerID
Params.SetLogger(Params.NodeID)
// Start node watch node
go node.StartWatchChannels(node.ctx)
node.session.Register()
// Start liveness check
go node.session.LivenessCheck(node.ctx, func() {
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
@ -191,13 +186,18 @@ func (node *DataNode) Register() error {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
Params.initMsgChannelSubName()
//TODO reset
//Params.initLogCfg()
log.Debug("DataNode Init",
zap.String("MsgChannelSubName", Params.MsgChannelSubName),
)
return nil
}
func (node *DataNode) initSession() error {
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
if node.session == nil {
return errors.New("failed to initialize session")
}
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = node.session.ServerID
node.NodeID = node.session.ServerID
Params.SetLogger(Params.NodeID)
return nil
}
@ -206,6 +206,17 @@ func (node *DataNode) Init() error {
log.Debug("DataNode Init",
zap.String("TimeTickChannelName", Params.TimeTickChannelName),
)
if err := node.initSession(); err != nil {
log.Error("DataNode init session failed", zap.Error(err))
return err
}
Params.initMsgChannelSubName()
//TODO reset
//Params.initLogCfg()
log.Debug("DataNode Init",
zap.String("MsgChannelSubName", Params.MsgChannelSubName),
)
return nil
}
@ -441,6 +452,9 @@ func (node *DataNode) Start() error {
go node.compactionExecutor.start(node.ctx)
// Start node watch node
go node.StartWatchChannels(node.ctx)
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()

View File

@ -63,6 +63,7 @@ func TestConnectionManager(t *testing.T) {
rootcoordpb.RegisterRootCoordServer(grpcServer, rootCoord)
go grpcServer.Serve(lis)
session.Init(typeutil.RootCoordRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
rootCoord, ok := cm.GetRootCoordClient()
return rootCoord != nil && ok
@ -79,6 +80,7 @@ func TestConnectionManager(t *testing.T) {
querypb.RegisterQueryCoordServer(grpcServer, queryCoord)
go grpcServer.Serve(lis)
session.Init(typeutil.QueryCoordRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
queryCoord, ok := cm.GetQueryCoordClient()
return queryCoord != nil && ok
@ -95,6 +97,7 @@ func TestConnectionManager(t *testing.T) {
datapb.RegisterDataCoordServer(grpcServer, dataCoord)
go grpcServer.Serve(lis)
session.Init(typeutil.DataCoordRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
dataCoord, ok := cm.GetDataCoordClient()
return dataCoord != nil && ok
@ -111,6 +114,7 @@ func TestConnectionManager(t *testing.T) {
indexpb.RegisterIndexCoordServer(grpcServer, indexCoord)
go grpcServer.Serve(lis)
session.Init(typeutil.IndexCoordRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
indexCoord, ok := cm.GetIndexCoordClient()
return indexCoord != nil && ok
@ -127,6 +131,7 @@ func TestConnectionManager(t *testing.T) {
querypb.RegisterQueryNodeServer(grpcServer, queryNode)
go grpcServer.Serve(lis)
session.Init(typeutil.QueryNodeRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
queryNodes, ok := cm.GetQueryNodeClients()
return len(queryNodes) == 1 && ok
@ -143,6 +148,7 @@ func TestConnectionManager(t *testing.T) {
datapb.RegisterDataNodeServer(grpcServer, dataNode)
go grpcServer.Serve(lis)
session.Init(typeutil.DataNodeRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
dataNodes, ok := cm.GetDataNodeClients()
return len(dataNodes) == 1 && ok
@ -159,6 +165,7 @@ func TestConnectionManager(t *testing.T) {
indexpb.RegisterIndexNodeServer(grpcServer, indexNode)
go grpcServer.Serve(lis)
session.Init(typeutil.IndexNodeRole, "127.0.0.1:9999", true)
session.Register()
assert.Eventually(t, func() bool {
indexNodes, ok := cm.GetIndexNodeClients()
return len(indexNodes) == 1 && ok

View File

@ -88,14 +88,7 @@ func (s *Server) init() error {
datacoord.Params.Port = Params.Port
datacoord.Params.Address = Params.Address
err := s.dataCoord.Register()
if err != nil {
log.Debug("DataCoord Register etcd failed", zap.Error(err))
return err
}
log.Debug("DataCoord Register etcd success")
err = s.startGrpc()
err := s.startGrpc()
if err != nil {
log.Debug("DataCoord startGrpc failed", zap.Error(err))
return err
@ -161,7 +154,17 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) start() error {
return s.dataCoord.Start()
err := s.dataCoord.Start()
if err != nil {
log.Error("DataCoord start failed", zap.Error(err))
return err
}
err = s.dataCoord.Register()
if err != nil {
log.Debug("DataCoord register service failed", zap.Error(err))
return err
}
return nil
}
// Stop stops the DataCoord server gracefully.

View File

@ -84,11 +84,6 @@ func (s *Server) init() error {
closer := trace.InitTracing("IndexCoord")
s.closer = closer
if err := s.indexcoord.Register(); err != nil {
log.Error("IndexCoord", zap.Any("register session error", err))
return err
}
s.loopWg.Add(1)
go s.startGrpcLoop(indexcoord.Params.Port)
// wait for grpc IndexCoord loop start
@ -100,6 +95,7 @@ func (s *Server) init() error {
log.Error("IndexCoord", zap.Any("init error", err))
return err
}
return nil
}
@ -109,6 +105,11 @@ func (s *Server) start() error {
return err
}
log.Debug("indexCoord started")
if err := s.indexcoord.Register(); err != nil {
log.Error("IndexCoord", zap.Any("register session error", err))
return err
}
log.Debug("IndexCoord registers service successfully")
return nil
}

View File

@ -133,13 +133,6 @@ func (s *Server) init() error {
}
}()
err = s.indexnode.Register()
if err != nil {
log.Error("IndexNode Register etcd failed", zap.Error(err))
return err
}
log.Debug("IndexNode Register etcd success")
s.loopWg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
@ -154,6 +147,7 @@ func (s *Server) init() error {
log.Error("IndexNode Init failed", zap.Error(err))
return err
}
return nil
}
@ -163,6 +157,12 @@ func (s *Server) start() error {
if err != nil {
return err
}
err = s.indexnode.Register()
if err != nil {
log.Error("IndexNode Register etcd failed", zap.Error(err))
return err
}
log.Debug("IndexNode Register etcd success")
return nil
}

View File

@ -172,12 +172,6 @@ func (s *Server) init() error {
log.Debug("proxy", zap.Int("proxy port", Params.Port))
log.Debug("proxy", zap.String("proxy address", Params.Address))
err = s.proxy.Register()
if err != nil {
log.Debug("Proxy Register etcd failed ", zap.Error(err))
return err
}
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
@ -264,7 +258,16 @@ func (s *Server) init() error {
}
func (s *Server) start() error {
return s.proxy.Start()
err := s.proxy.Start()
if err != nil {
log.Error("Proxy start failed", zap.Error(err))
}
err = s.proxy.Register()
if err != nil {
log.Error("Proxy register service failed ", zap.Error(err))
return err
}
return nil
}
// Stop stop the Proxy Server

View File

@ -108,10 +108,6 @@ func (s *Server) init() error {
closer := trace.InitTracing("querycoord")
s.closer = closer
if err := s.queryCoord.Register(); err != nil {
return err
}
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
@ -261,7 +257,11 @@ func (s *Server) startGrpcLoop(grpcPort int) {
// start starts QueryCoord's grpc service.
func (s *Server) start() error {
return s.queryCoord.Start()
err := s.queryCoord.Start()
if err != nil {
return err
}
return s.queryCoord.Register()
}
// Stop stops QueryCoord's grpc service.

View File

@ -383,7 +383,9 @@ func Test_NewServer(t *testing.T) {
assert.Nil(t, err)
}
// This test will no longer return error immediately.
func TestServer_Run1(t *testing.T) {
t.Skip()
ctx := context.Background()
server, err := NewServer(ctx, nil)
assert.Nil(t, err)

View File

@ -169,15 +169,20 @@ func (s *Server) init() error {
return err
}
if err := s.querynode.Register(); err != nil {
return err
}
return nil
}
// start starts QueryNode's grpc service.
func (s *Server) start() error {
return s.querynode.Start()
if err := s.querynode.Start(); err != nil {
log.Error("QueryNode start failed", zap.Error(err))
return err
}
if err := s.querynode.Register(); err != nil {
log.Error("QueryNode register service failed", zap.Error(err))
return err
}
return nil
}
// startGrpcLoop starts the grpc loop of QueryNode component.

View File

@ -153,12 +153,7 @@ func (s *Server) init() error {
log.Debug("init params done")
err := s.rootCoord.Register()
if err != nil {
return err
}
err = s.startGrpc(Params.Port)
err := s.startGrpc(Params.Port)
if err != nil {
return err
}
@ -258,6 +253,11 @@ func (s *Server) startGrpcLoop(port int) {
func (s *Server) start() error {
log.Debug("RootCoord Core start ...")
if err := s.rootCoord.Start(); err != nil {
log.Error(err.Error())
return err
}
if err := s.rootCoord.Register(); err != nil {
log.Error("RootCoord registers service failed", zap.Error(err))
return err
}
return nil

View File

@ -100,9 +100,6 @@ func TestGrpcService(t *testing.T) {
core, ok := (svr.rootCoord).(*rootcoord.Core)
assert.True(t, ok)
err = core.Register()
assert.Nil(t, err)
err = svr.startGrpc(Params.Port)
assert.Nil(t, err)
svr.rootCoord.UpdateStateCode(internalpb.StateCode_Initializing)
@ -122,6 +119,8 @@ func TestGrpcService(t *testing.T) {
_, err = etcdCli.Put(ctx, path.Join(sessKey, typeutil.ProxyRole+"-100"), string(pnb))
assert.Nil(t, err)
rootcoord.Params.Address = Params.Address
err = core.Init()
assert.Nil(t, err)
@ -216,10 +215,6 @@ func TestGrpcService(t *testing.T) {
return nil
}
rootcoord.Params.Address = Params.Address
err = svr.rootCoord.Register()
assert.Nil(t, err)
err = svr.start()
assert.Nil(t, err)

View File

@ -120,12 +120,25 @@ func NewIndexCoord(ctx context.Context) (*IndexCoord, error) {
// Register register IndexCoord role at etcd.
func (i *IndexCoord) Register() error {
i.session.Register()
go i.session.LivenessCheck(i.loopCtx, func() {
log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
return nil
}
func (i *IndexCoord) initSession() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
if i.session == nil {
return errors.New("failed to initialize session")
}
i.session.Init(typeutil.IndexCoordRole, Params.Address, true)
Params.SetLogger(typeutil.UniqueID(-1))
Params.SetLogger(i.session.ServerID)
return nil
}
@ -134,8 +147,15 @@ func (i *IndexCoord) Init() error {
var initErr error
Params.InitOnce()
i.initOnce.Do(func() {
log.Debug("IndexCoord", zap.Strings("etcd endpoints", Params.EtcdEndpoints))
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(internalpb.StateCode)))
err := i.initSession()
if err != nil {
log.Error(err.Error())
initErr = err
return
}
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
@ -150,7 +170,7 @@ func (i *IndexCoord) Init() error {
return err
}
log.Debug("IndexCoord try to connect etcd")
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
initErr = err
@ -250,15 +270,6 @@ func (i *IndexCoord) Start() error {
i.loopWg.Add(1)
go i.watchMetaLoop()
go i.session.LivenessCheck(i.loopCtx, func() {
log.Error("Index Coord disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
startErr = i.sched.Start()
i.UpdateStateCode(internalpb.StateCode_Healthy)

View File

@ -71,6 +71,7 @@ func (icm *Mock) Register() error {
err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole)
session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
session.Init(typeutil.IndexCoordRole, Params.Address, true)
session.Register()
return err
}

View File

@ -63,11 +63,11 @@ func TestIndexCoord(t *testing.T) {
ic.assignTaskInterval = 200 * time.Millisecond
ic.taskLimit = 20
Params.Init()
err = ic.Register()
assert.Nil(t, err)
err = ic.Init()
assert.Nil(t, err)
err = ic.Register()
assert.Nil(t, err)
err = ic.Start()
assert.Nil(t, err)

View File

@ -31,13 +31,13 @@ func TestGetSystemInfoMetrics(t *testing.T) {
ic, err := NewIndexCoord(ctx)
assert.Nil(t, err)
Params.Init()
err = ic.Register()
assert.Nil(t, err)
err = ic.Init()
assert.Nil(t, err)
err = ic.Start()
assert.Nil(t, err)
err = ic.Register()
assert.Nil(t, err)
t.Run("getSystemInfoMetrics", func(t *testing.T) {
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)

View File

@ -113,13 +113,17 @@ func NewIndexNode(ctx context.Context) (*IndexNode, error) {
// Register register index node at etcd.
func (i *IndexNode) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
if i.session == nil {
return errors.New("failed to initialize session")
}
i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = i.session.ServerID
Params.SetLogger(Params.NodeID)
i.session.Register()
//start liveness check
go i.session.LivenessCheck(i.loopCtx, func() {
log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
return nil
}
@ -134,19 +138,39 @@ func (i *IndexNode) initKnowhere() {
C.free(unsafe.Pointer(cSimdType))
}
func (i *IndexNode) initSession() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
if i.session == nil {
return errors.New("failed to initialize session")
}
i.session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
Params.NodeID = i.session.ServerID
Params.SetLogger(Params.NodeID)
return nil
}
// Init initializes the IndexNode component.
func (i *IndexNode) Init() error {
var initErr error = nil
i.initOnce.Do(func() {
Params.Init()
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode init", zap.Any("State", internalpb.StateCode_Initializing))
log.Debug("IndexNode init", zap.Any("State", i.stateCode.Load().(internalpb.StateCode)))
err := i.initSession()
if err != nil {
log.Error(err.Error())
initErr = err
return
}
log.Debug("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID))
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
i.etcdKV = etcdKV
return err
}
err := retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Error("IndexNode failed to connect to etcd", zap.Error(err))
initErr = err
@ -191,16 +215,6 @@ func (i *IndexNode) Start() error {
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()
//start liveness check
go i.session.LivenessCheck(i.loopCtx, func() {
log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexNode", zap.Any("State", i.stateCode.Load()))
})

View File

@ -188,6 +188,7 @@ func (inm *Mock) Register() error {
}
session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
session.Init(typeutil.IndexNodeRole, "localhost:21121", false)
session.Register()
return nil
}

View File

@ -71,14 +71,15 @@ func TestIndexNode(t *testing.T) {
assert.Nil(t, err)
Params.Init()
err = in.Register()
assert.Nil(t, err)
err = in.Init()
assert.Nil(t, err)
err = in.Start()
assert.Nil(t, err)
err = in.Register()
assert.Nil(t, err)
t.Run("CreateIndex FloatVector", func(t *testing.T) {
var insertCodec storage.InsertCodec
@ -476,14 +477,15 @@ func TestCreateIndexFailed(t *testing.T) {
assert.Nil(t, err)
Params.Init()
err = in.Register()
assert.Nil(t, err)
err = in.Init()
assert.Nil(t, err)
err = in.Start()
assert.Nil(t, err)
err = in.Register()
assert.Nil(t, err)
t.Run("CreateIndex error", func(t *testing.T) {
var insertCodec storage.InsertCodec
@ -742,14 +744,15 @@ func TestIndexNode_Error(t *testing.T) {
assert.Nil(t, err)
Params.Init()
err = in.Register()
assert.Nil(t, err)
err = in.Init()
assert.Nil(t, err)
err = in.Start()
assert.Nil(t, err)
err = in.Register()
assert.Nil(t, err)
in.UpdateStateCode(internalpb.StateCode_Initializing)
t.Run("CreateIndex", func(t *testing.T) {

View File

@ -109,11 +109,7 @@ func NewProxy(ctx context.Context, factory msgstream.Factory) (*Proxy, error) {
// Register registers proxy at etcd
func (node *Proxy) Register() error {
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
node.session.Init(typeutil.ProxyRole, Params.NetworkAddress, false)
Params.ProxyID = node.session.ServerID
Params.SetLogger(Params.ProxyID)
Params.initProxySubName()
node.session.Register()
go node.session.LivenessCheck(node.ctx, func() {
log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
if err := node.Stop(); err != nil {
@ -126,8 +122,25 @@ func (node *Proxy) Register() error {
return nil
}
func (node *Proxy) initSession() error {
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
if node.session == nil {
return errors.New("new session failed, maybe etcd cannot be connected")
}
node.session.Init(typeutil.ProxyRole, Params.NetworkAddress, false)
Params.ProxyID = node.session.ServerID
Params.SetLogger(Params.ProxyID)
return nil
}
// Init initialize proxy.
func (node *Proxy) Init() error {
err := node.initSession()
if err != nil {
log.Error("Proxy init session failed", zap.Error(err))
return err
}
Params.initProxySubName()
// wait for datacoord state changed to Healthy
if node.dataCoord != nil {
log.Debug("Proxy wait for dataCoord ready")
@ -185,7 +198,7 @@ func (node *Proxy) Init() error {
m := map[string]interface{}{
"PulsarAddress": Params.PulsarAddress,
"PulsarBufSize": 1024}
err := node.msFactory.SetParams(m)
err = node.msFactory.SetParams(m)
if err != nil {
return err
}

View File

@ -419,11 +419,6 @@ func TestProxy(t *testing.T) {
Params.Init()
log.Info("Initialize parameter table of proxy")
// register proxy
err = proxy.Register()
assert.NoError(t, err)
log.Info("Register proxy done")
rootCoordClient, err := rcc.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints)
assert.NoError(t, err)
err = rootCoordClient.Init()
@ -468,6 +463,11 @@ func TestProxy(t *testing.T) {
err = proxy.Start()
assert.NoError(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, proxy.stateCode.Load().(internalpb.StateCode))
// register proxy
err = proxy.Register()
assert.NoError(t, err)
log.Info("Register proxy done")
defer func() {
err := proxy.Stop()
assert.NoError(t, err)

View File

@ -36,6 +36,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
assert.Nil(t, err)
clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true)
clusterSession.Register()
meta, err := newMeta(baseCtx, kv, nil, nil)
assert.Nil(t, err)
cluster := &queryNodeCluster{
@ -77,7 +78,6 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
nodeID := node.queryNodeID
cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect)
waitQueryNodeOnline(cluster, nodeID)
err = shuffleChannelsToQueryNode(baseCtx, reqs, cluster, false, nil)
assert.Nil(t, err)

View File

@ -397,6 +397,7 @@ func TestReloadClusterFromKV(t *testing.T) {
assert.Nil(t, err)
clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true)
clusterSession.Register()
cluster := &queryNodeCluster{
ctx: baseCtx,
client: kv,
@ -425,6 +426,7 @@ func TestReloadClusterFromKV(t *testing.T) {
assert.Nil(t, err)
clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true)
clusterSession.Register()
cluster := &queryNodeCluster{
client: kv,
nodes: make(map[int64]Node),
@ -472,6 +474,7 @@ func TestGrpcRequest(t *testing.T) {
assert.Nil(t, err)
clusterSession := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
clusterSession.Init(typeutil.QueryCoordRole, Params.Address, true)
clusterSession.Register()
meta, err := newMeta(baseCtx, kv, nil, nil)
assert.Nil(t, err)
cluster := &queryNodeCluster{

View File

@ -106,6 +106,7 @@ func (qs *queryNodeServerMock) Register() error {
qs.queryNodeID = qs.session.ServerID
log.Debug("query nodeID", zap.Int64("nodeID", qs.queryNodeID))
log.Debug("query node address", zap.String("address", qs.session.Address))
qs.session.Register()
return nil
}

View File

@ -19,15 +19,14 @@ package querycoord
import (
"context"
"errors"
"math"
"sort"
"syscall"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/golang/protobuf/proto"
@ -95,16 +94,32 @@ type QueryCoord struct {
// Register register query service at etcd
func (qc *QueryCoord) Register() error {
log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
qc.session.Register()
go qc.session.LivenessCheck(qc.loopCtx, func() {
log.Error("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID))
if err := qc.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
return nil
}
func (qc *QueryCoord) initSession() error {
qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
if qc.session == nil {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
Params.NodeID = uint64(qc.session.ServerID)
Params.SetLogger(typeutil.UniqueID(-1))
Params.SetLogger(qc.session.ServerID)
return nil
}
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
func (qc *QueryCoord) Init() error {
log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
log.Debug("query coordinator start init")
//connect etcd
connectEtcdFn := func() error {
@ -117,6 +132,12 @@ func (qc *QueryCoord) Init() error {
}
var initError error
qc.initOnce.Do(func() {
err := qc.initSession()
if err != nil {
log.Error("QueryCoord init session failed", zap.Error(err))
initError = err
return
}
log.Debug("query coordinator try to connect etcd")
initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
if initError != nil {
@ -194,8 +215,6 @@ func (qc *QueryCoord) Start() error {
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()
qc.UpdateStateCode(internalpb.StateCode_Healthy)
qc.loopWg.Add(1)
go qc.watchNodeLoop()
@ -207,14 +226,7 @@ func (qc *QueryCoord) Start() error {
go qc.loadBalanceSegmentLoop()
}
go qc.session.LivenessCheck(qc.loopCtx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID))
if err := qc.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
qc.UpdateStateCode(internalpb.StateCode_Healthy)
return nil
}

View File

@ -90,10 +90,6 @@ func startQueryCoord(ctx context.Context) (*QueryCoord, error) {
coord.SetDataCoord(dataCoord)
coord.SetIndexCoord(indexCoord)
err = coord.Register()
if err != nil {
return nil, err
}
err = coord.Init()
if err != nil {
return nil, err
@ -103,6 +99,10 @@ func startQueryCoord(ctx context.Context) (*QueryCoord, error) {
if err != nil {
return nil, err
}
err = coord.Register()
if err != nil {
return nil, err
}
return coord, nil
}
@ -131,11 +131,11 @@ func startUnHealthyQueryCoord(ctx context.Context) (*QueryCoord, error) {
coord.SetRootCoord(rootCoord)
coord.SetDataCoord(dataCoord)
err = coord.Register()
err = coord.Init()
if err != nil {
return nil, err
}
err = coord.Init()
err = coord.Register()
if err != nil {
return nil, err
}

View File

@ -126,11 +126,21 @@ func NewQueryNode(ctx context.Context, factory msgstream.Factory) *QueryNode {
return node
}
func (node *QueryNode) initSession() error {
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
if node.session == nil {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
Params.QueryNodeID = node.session.ServerID
Params.SetLogger(Params.QueryNodeID)
log.Debug("QueryNode", zap.Int64("nodeID", Params.QueryNodeID), zap.String("node address", node.session.Address))
return nil
}
// Register register query node at etcd
func (node *QueryNode) Register() error {
log.Debug("query node session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints))
node.session = sessionutil.NewSession(node.queryNodeLoopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
node.session.Init(typeutil.QueryNodeRole, Params.QueryNodeIP+":"+strconv.FormatInt(Params.QueryNodePort, 10), false)
node.session.Register()
// start liveness check
go node.session.LivenessCheck(node.queryNodeLoopCtx, func() {
log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
@ -141,13 +151,6 @@ func (node *QueryNode) Register() error {
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
Params.QueryNodeID = node.session.ServerID
Params.SetLogger(Params.QueryNodeID)
log.Debug("query nodeID", zap.Int64("nodeID", Params.QueryNodeID))
log.Debug("query node address", zap.String("address", node.session.Address))
// This param needs valid QueryNodeID
Params.initMsgChannelSubName()
//TODO Reset the logger
//Params.initLogCfg()
return nil
@ -174,6 +177,13 @@ func (node *QueryNode) Init() error {
var initError error = nil
node.initOnce.Do(func() {
//ctx := context.Background()
log.Debug("QueryNode session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints))
err := node.initSession()
if err != nil {
log.Error("QueryNode init session failed", zap.Error(err))
initError = err
return
}
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
@ -186,7 +196,7 @@ func (node *QueryNode) Init() error {
zap.Any("EtcdEndpoints", Params.EtcdEndpoints),
zap.Any("MetaRootPath", Params.MetaRootPath),
)
err := retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300))
err = retry.Do(node.queryNodeLoopCtx, connectEtcdFn, retry.Attempts(300))
if err != nil {
log.Debug("queryNode try to connect etcd failed", zap.Error(err))
initError = err
@ -241,6 +251,8 @@ func (node *QueryNode) Init() error {
zap.Any("IP", Params.QueryNodeIP),
zap.Any("Port", Params.QueryNodePort),
)
// This param needs valid QueryNodeID
Params.initMsgChannelSubName()
})
return initError

View File

@ -270,6 +270,9 @@ func TestQueryNode_register(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
err = node.initSession()
assert.NoError(t, err)
err = node.Register()
assert.NoError(t, err)
}

View File

@ -912,12 +912,25 @@ func (c *Core) ExpireMetaCache(ctx context.Context, collNames []string, ts typeu
// Register register rootcoord at etcd
func (c *Core) Register() error {
c.session.Register()
go c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
if err := c.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
return nil
}
func (c *Core) initSession() error {
c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
if c.session == nil {
return fmt.Errorf("session is nil, the etcd client connection may have failed")
}
c.session.Init(typeutil.RootCoordRole, Params.Address, true)
Params.SetLogger(typeutil.UniqueID(-1))
Params.SetLogger(c.session.ServerID)
return nil
}
@ -930,6 +943,11 @@ func (c *Core) Init() error {
}
}
c.initOnce.Do(func() {
if err := c.initSession(); err != nil {
initError = err
log.Error("RootCoord init session failed", zap.Error(err))
return
}
connectEtcdFn := func() error {
if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil {
log.Error("RootCoord failed to new Etcd client", zap.Any("reason", initError))
@ -1166,14 +1184,6 @@ func (c *Core) Start() error {
go c.tsLoop()
go c.chanTimeTick.startWatch(&c.wg)
go c.checkFlushedSegmentsLoop()
go c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
if err := c.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
})
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()

View File

@ -436,11 +436,12 @@ func TestRootCoordInit(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
err = core.Init()
assert.Nil(t, err)
err = core.Register()
assert.Nil(t, err)
// inject kvBaseCreate fail
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
@ -450,14 +451,15 @@ func TestRootCoordInit(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
return nil, retry.Unrecoverable(errors.New("injected"))
}
err = core.Init()
assert.NotNil(t, err)
err = core.Register()
assert.Nil(t, err)
// inject metaKV create fail
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
@ -467,8 +469,6 @@ func TestRootCoordInit(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(root string) (kv.TxnKV, error) {
if root == Params.MetaRootPath {
return nil, retry.Unrecoverable(errors.New("injected"))
@ -478,6 +478,9 @@ func TestRootCoordInit(t *testing.T) {
err = core.Init()
assert.NotNil(t, err)
err = core.Register()
assert.Nil(t, err)
// inject newSuffixSnapshot failure
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
@ -487,14 +490,15 @@ func TestRootCoordInit(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
return nil, nil
}
err = core.Init()
assert.NotNil(t, err)
err = core.Register()
assert.Nil(t, err)
// inject newMetaTable failure
core, err = NewCore(ctx, coreFactory)
require.Nil(t, err)
@ -504,8 +508,6 @@ func TestRootCoordInit(t *testing.T) {
Params.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.MetaRootPath)
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
err = core.Register()
assert.Nil(t, err)
core.kvBaseCreate = func(string) (kv.TxnKV, error) {
kv := memkv.NewMemoryKV()
return &loadPrefixFailKV{TxnKV: kv}, nil
@ -513,6 +515,9 @@ func TestRootCoordInit(t *testing.T) {
err = core.Init()
assert.NotNil(t, err)
err = core.Register()
assert.Nil(t, err)
}
func TestRootCoord(t *testing.T) {
@ -543,9 +548,6 @@ func TestRootCoord(t *testing.T) {
Params.DmlChannelName = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.DeltaChannelName = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
err = core.Register()
assert.Nil(t, err)
etcdCli, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second})
assert.Nil(t, err)
sessKey := path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot)
@ -624,6 +626,9 @@ func TestRootCoord(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
err = core.Register()
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
shardsNum := int32(8)
@ -2213,9 +2218,6 @@ func TestRootCoord2(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
err = core.Register()
assert.Nil(t, err)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
assert.Nil(t, err)
@ -2247,6 +2249,9 @@ func TestRootCoord2(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
err = core.Register()
assert.Nil(t, err)
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
@ -2481,9 +2486,6 @@ func TestCheckFlushedSegments(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
err = core.Register()
assert.Nil(t, err)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
assert.Nil(t, err)
@ -2515,6 +2517,9 @@ func TestCheckFlushedSegments(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
err = core.Register()
assert.Nil(t, err)
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,
@ -2638,9 +2643,6 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath)
Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal)
err = core.Register()
assert.Nil(t, err)
dm := &dataMock{randVal: randVal}
err = core.SetDataCoord(ctx, dm)
assert.Nil(t, err)
@ -2672,6 +2674,9 @@ func TestRootCoord_CheckZeroShardsNum(t *testing.T) {
err = core.Start()
assert.Nil(t, err)
err = core.Register()
assert.Nil(t, err)
m := map[string]interface{}{
"receiveBufSize": 1024,
"pulsarAddress": Params.PulsarAddress,

View File

@ -107,7 +107,6 @@ func NewSession(ctx context.Context, metaRoot string, etcdEndpoints []string) *S
// Init will initialize base struct of the Session, including ServerName, ServerID,
// Address, Exclusive. ServerID is obtained in getServerID.
// Finally it will process keepAliveResponse to keep alive with etcd.
func (s *Session) Init(serverName, address string, exclusive bool) {
s.ServerName = serverName
s.Address = address
@ -118,6 +117,10 @@ func (s *Session) Init(serverName, address string, exclusive bool) {
panic(err)
}
s.ServerID = serverID
}
// Register will process keepAliveResponse to keep alive with etcd.
func (s *Session) Register() {
ch, err := s.registerService()
if err != nil {
panic(err)

View File

@ -91,6 +91,7 @@ func TestInit(t *testing.T) {
s.Init("inittest", "testAddr", false)
assert.NotEqual(t, int64(0), s.leaseID)
assert.NotEqual(t, int64(0), s.ServerID)
s.Register()
sessions, _, err := s.GetSessions("inittest")
assert.Nil(t, err)
assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10))
@ -128,6 +129,7 @@ func TestUpdateSessions(t *testing.T) {
getIDFunc := func() {
singleS := NewSession(ctx, metaRoot, etcdEndpoints)
singleS.Init("test", "testAddr", false)
singleS.Register()
muList.Lock()
sList = append(sList, singleS)
muList.Unlock()