diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 53c52a63f0..98fe6ec988 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -282,7 +282,6 @@ func (s *Server) initSession() error { // Init change server state to Initializing func (s *Server) Init() error { var err error - s.stateCode.Store(commonpb.StateCode_Initializing) s.factory.Init(Params) if err = s.initSession(); err != nil { return err @@ -295,7 +294,6 @@ func (s *Server) Init() error { return err } s.startDataCoord() - s.stateCode.Store(commonpb.StateCode_Healthy) log.Info("DataCoord startup success") return nil } @@ -308,6 +306,7 @@ func (s *Server) Init() error { } func (s *Server) initDataCoord() error { + s.stateCode.Store(commonpb.StateCode_Initializing) var err error if err = s.initRootCoordClient(); err != nil { return err @@ -363,7 +362,6 @@ func (s *Server) initDataCoord() error { func (s *Server) Start() error { if !s.enableActiveStandBy { s.startDataCoord() - s.stateCode.Store(commonpb.StateCode_Healthy) log.Info("DataCoord startup successfully") } @@ -382,6 +380,7 @@ func (s *Server) startDataCoord() { // data while offline. log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes") s.reCollectSegmentStats(s.ctx) + s.stateCode.Store(commonpb.StateCode_Healthy) } func (s *Server) initCluster() error { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9610ac3355..2f901939c6 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3549,10 +3549,16 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { err = svr.Init() assert.Nil(t, err) - err = svr.Start() - assert.Nil(t, err) + if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() { + assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode)) + } else { + assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode)) + } err = svr.Register() assert.Nil(t, err) + err = svr.Start() + assert.Nil(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode)) // Stop channal watch state watcher in tests if svr.channelManager != nil && svr.channelManager.stopChecker != nil { diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index a3e7e3dafc..87f1eb2795 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -188,8 +188,6 @@ func (s *Server) init() error { } log.Debug("QueryCoord report DataCoord ready") - s.queryCoord.UpdateStateCode(commonpb.StateCode_Initializing) - log.Debug("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing)) if err := s.queryCoord.Init(); err != nil { return err } diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index ef01c5ba9d..63b53d8068 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -164,9 +164,6 @@ func (s *Server) init() error { } log.Debug("grpc init done ...") - s.rootCoord.UpdateStateCode(commonpb.StateCode_Initializing) - log.Debug("RootCoord", zap.Any("State", commonpb.StateCode_Initializing)) - if s.newDataCoordClient != nil { log.Debug("RootCoord start to create DataCoord client") dataCoord := s.newDataCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index f1ec76d709..383cefdb82 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -179,7 +179,6 @@ func (s *Server) Init() error { log.Error("QueryCoord init failed", zap.Error(err)) return err } - s.UpdateStateCode(commonpb.StateCode_Healthy) log.Info("QueryCoord startup success") return nil } @@ -192,6 +191,8 @@ func (s *Server) Init() error { } func (s *Server) initQueryCoord() error { + s.UpdateStateCode(commonpb.StateCode_Initializing) + log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing)) // Init KV etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) s.kv = etcdKV @@ -356,7 +357,6 @@ func (s *Server) Start() error { if err := s.startQueryCoord(); err != nil { return err } - s.UpdateStateCode(commonpb.StateCode_Healthy) log.Info("QueryCoord started") } return nil @@ -386,6 +386,7 @@ func (s *Server) startQueryCoord() error { } s.startServerLoop() s.afterStart() + s.UpdateStateCode(commonpb.StateCode_Healthy) return nil } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index e2b7db482e..df285ba04a 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -228,11 +228,13 @@ func (suite *ServerSuite) TestDisableActiveStandby() { suite.server, err = newQueryCoord() suite.NoError(err) + suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode)) suite.hackServer() err = suite.server.Start() suite.NoError(err) err = suite.server.Register() suite.NoError(err) + suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode)) states, err := suite.server.GetComponentStates(context.Background()) suite.NoError(err) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a5b5570835..a23cb7f195 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -292,7 +292,6 @@ func (c *Core) Register() error { } }) - c.UpdateStateCode(commonpb.StateCode_Healthy) return nil } @@ -406,6 +405,7 @@ func (c *Core) initImportManager() error { } func (c *Core) initInternal() error { + c.UpdateStateCode(commonpb.StateCode_Initializing) c.initKVCreator() if err := c.initMetaTable(); err != nil { @@ -487,8 +487,6 @@ func (c *Core) Init() error { log.Error("RootCoord start failed", zap.Error(err)) } }) - - c.UpdateStateCode(commonpb.StateCode_Healthy) log.Info("RootCoord startup success") return err } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 72b71130b7..f8a101c7b9 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1415,11 +1415,63 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { assert.NoError(t, err) err = core.Init() assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_StandBy, core.stateCode.Load().(commonpb.StateCode)) err = core.Start() assert.NoError(t, err) core.session.TriggerKill = false err = core.Register() assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode)) + resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_DescribeCollection, + MsgID: 0, + Timestamp: 0, + SourceID: paramtable.GetNodeID(), + }, + CollectionName: "unexist"}) + assert.NoError(t, err) + assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + err = core.Stop() + assert.NoError(t, err) +} + +// make sure the main functions work well when EnableActiveStandby=false +func TestRootcoord_DisableActiveStandby(t *testing.T) { + randVal := rand.Int() + Params.Init() + Params.BaseTable.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) + paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "false") + paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) + paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) + paramtable.Get().Save(Params.CommonCfg.RootCoordSubName.Key, fmt.Sprintf("subname-%d", randVal)) + paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, fmt.Sprintf("rootcoord-dml-test-%d", randVal)) + paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, fmt.Sprintf("rootcoord-delta-test-%d", randVal)) + + ctx := context.Background() + coreFactory := dependency.NewDefaultFactory(true) + etcdCli, err := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + assert.NoError(t, err) + defer etcdCli.Close() + core, err := NewCore(ctx, coreFactory) + core.etcdCli = etcdCli + assert.NoError(t, err) + err = core.Init() + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Initializing, core.stateCode.Load().(commonpb.StateCode)) + err = core.Start() + assert.NoError(t, err) + core.session.TriggerKill = false + err = core.Register() + assert.NoError(t, err) + assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode)) resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_DescribeCollection,