Update component state to healthy after start (#22118)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/22128/head
cai.zhang 2023-02-10 17:40:32 +08:00 committed by GitHub
parent 2a19f4927d
commit 66b3566ac1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 15 deletions

View File

@ -282,7 +282,6 @@ func (s *Server) initSession() error {
// Init change server state to Initializing
func (s *Server) Init() error {
var err error
s.stateCode.Store(commonpb.StateCode_Initializing)
s.factory.Init(Params)
if err = s.initSession(); err != nil {
return err
@ -295,7 +294,6 @@ func (s *Server) Init() error {
return err
}
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("DataCoord startup success")
return nil
}
@ -308,6 +306,7 @@ func (s *Server) Init() error {
}
func (s *Server) initDataCoord() error {
s.stateCode.Store(commonpb.StateCode_Initializing)
var err error
if err = s.initRootCoordClient(); err != nil {
return err
@ -363,7 +362,6 @@ func (s *Server) initDataCoord() error {
func (s *Server) Start() error {
if !s.enableActiveStandBy {
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("DataCoord startup successfully")
}
@ -382,6 +380,7 @@ func (s *Server) startDataCoord() {
// data while offline.
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
s.reCollectSegmentStats(s.ctx)
s.stateCode.Store(commonpb.StateCode_Healthy)
}
func (s *Server) initCluster() error {

View File

@ -3549,10 +3549,16 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
err = svr.Init()
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() {
assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode))
} else {
assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode))
}
err = svr.Register()
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode))
// Stop channal watch state watcher in tests
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {

View File

@ -188,8 +188,6 @@ func (s *Server) init() error {
}
log.Debug("QueryCoord report DataCoord ready")
s.queryCoord.UpdateStateCode(commonpb.StateCode_Initializing)
log.Debug("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))
if err := s.queryCoord.Init(); err != nil {
return err
}

View File

@ -164,9 +164,6 @@ func (s *Server) init() error {
}
log.Debug("grpc init done ...")
s.rootCoord.UpdateStateCode(commonpb.StateCode_Initializing)
log.Debug("RootCoord", zap.Any("State", commonpb.StateCode_Initializing))
if s.newDataCoordClient != nil {
log.Debug("RootCoord start to create DataCoord client")
dataCoord := s.newDataCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)

View File

@ -179,7 +179,6 @@ func (s *Server) Init() error {
log.Error("QueryCoord init failed", zap.Error(err))
return err
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord startup success")
return nil
}
@ -192,6 +191,8 @@ func (s *Server) Init() error {
}
func (s *Server) initQueryCoord() error {
s.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))
// Init KV
etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
s.kv = etcdKV
@ -356,7 +357,6 @@ func (s *Server) Start() error {
if err := s.startQueryCoord(); err != nil {
return err
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord started")
}
return nil
@ -386,6 +386,7 @@ func (s *Server) startQueryCoord() error {
}
s.startServerLoop()
s.afterStart()
s.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}

View File

@ -228,11 +228,13 @@ func (suite *ServerSuite) TestDisableActiveStandby() {
suite.server, err = newQueryCoord()
suite.NoError(err)
suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode))
suite.hackServer()
err = suite.server.Start()
suite.NoError(err)
err = suite.server.Register()
suite.NoError(err)
suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode))
states, err := suite.server.GetComponentStates(context.Background())
suite.NoError(err)

View File

@ -292,7 +292,6 @@ func (c *Core) Register() error {
}
})
c.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
@ -406,6 +405,7 @@ func (c *Core) initImportManager() error {
}
func (c *Core) initInternal() error {
c.UpdateStateCode(commonpb.StateCode_Initializing)
c.initKVCreator()
if err := c.initMetaTable(); err != nil {
@ -487,8 +487,6 @@ func (c *Core) Init() error {
log.Error("RootCoord start failed", zap.Error(err))
}
})
c.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("RootCoord startup success")
return err
}

View File

@ -1415,11 +1415,63 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
assert.NoError(t, err)
err = core.Init()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_StandBy, core.stateCode.Load().(commonpb.StateCode))
err = core.Start()
assert.NoError(t, err)
core.session.TriggerKill = false
err = core.Register()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0,
Timestamp: 0,
SourceID: paramtable.GetNodeID(),
},
CollectionName: "unexist"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
err = core.Stop()
assert.NoError(t, err)
}
// make sure the main functions work well when EnableActiveStandby=false
func TestRootcoord_DisableActiveStandby(t *testing.T) {
randVal := rand.Int()
Params.Init()
Params.BaseTable.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal))
paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "false")
paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal))
paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal))
paramtable.Get().Save(Params.CommonCfg.RootCoordSubName.Key, fmt.Sprintf("subname-%d", randVal))
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, fmt.Sprintf("rootcoord-dml-test-%d", randVal))
paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, fmt.Sprintf("rootcoord-delta-test-%d", randVal))
ctx := context.Background()
coreFactory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
Params.EtcdCfg.Endpoints.GetAsStrings(),
Params.EtcdCfg.EtcdTLSCert.GetValue(),
Params.EtcdCfg.EtcdTLSKey.GetValue(),
Params.EtcdCfg.EtcdTLSCACert.GetValue(),
Params.EtcdCfg.EtcdTLSMinVersion.GetValue())
assert.NoError(t, err)
defer etcdCli.Close()
core, err := NewCore(ctx, coreFactory)
core.etcdCli = etcdCli
assert.NoError(t, err)
err = core.Init()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Initializing, core.stateCode.Load().(commonpb.StateCode))
err = core.Start()
assert.NoError(t, err)
core.session.TriggerKill = false
err = core.Register()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,