mirror of https://github.com/milvus-io/milvus.git
Update healthz to support standby state and fix rootcoord active-standby (#21130)
Signed-off-by: wayblink <anyang.wang@zilliz.com> Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/21147/head
parent
3a6eff32f4
commit
38e699482d
|
@ -75,7 +75,7 @@ func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
|
|||
Name: in.GetName(),
|
||||
Code: code,
|
||||
})
|
||||
if code != commonpb.StateCode_Healthy {
|
||||
if code != commonpb.StateCode_Healthy && code != commonpb.StateCode_StandBy {
|
||||
resp.State = fmt.Sprintf("component %s state is %s", in.GetName(), code.String())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/errorutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/logutil"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
|
@ -295,9 +296,6 @@ func (c *Core) Register() error {
|
|||
c.session.Register()
|
||||
if c.enableActiveStandBy {
|
||||
c.session.ProcessActiveStandBy(c.activateFunc)
|
||||
} else {
|
||||
c.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
log.Info("RootCoord start successfully ", zap.String("State Code", commonpb.StateCode_Healthy.String()))
|
||||
}
|
||||
log.Info("RootCoord Register Finished")
|
||||
go c.session.LivenessCheck(c.ctx, func() {
|
||||
|
@ -618,16 +616,6 @@ func (c *Core) startInternal() error {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
c.wg.Add(6)
|
||||
go c.startTimeTickLoop()
|
||||
go c.tsLoop()
|
||||
go c.chanTimeTick.startWatch(&c.wg)
|
||||
go c.importManager.cleanupLoop(&c.wg)
|
||||
go c.importManager.sendOutTasksLoop(&c.wg)
|
||||
go c.importManager.flipTaskStateLoop(&c.wg)
|
||||
Params.RootCoordCfg.CreatedTime = time.Now()
|
||||
Params.RootCoordCfg.UpdatedTime = time.Now()
|
||||
|
||||
if Params.QuotaConfig.QuotaAndLimitsEnabled {
|
||||
go c.quotaCenter.run()
|
||||
}
|
||||
|
@ -642,13 +630,29 @@ func (c *Core) startInternal() error {
|
|||
c.activateFunc = func() {
|
||||
// todo to complete
|
||||
log.Info("rootcoord switch from standby to active, activating")
|
||||
c.startServerLoop()
|
||||
c.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
}
|
||||
c.UpdateStateCode(commonpb.StateCode_StandBy)
|
||||
logutil.Logger(c.ctx).Info("rootcoord enter standby mode successfully")
|
||||
} else {
|
||||
c.startServerLoop()
|
||||
c.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
logutil.Logger(c.ctx).Info("rootcoord startup successfully")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) startServerLoop() {
|
||||
c.wg.Add(6)
|
||||
go c.startTimeTickLoop()
|
||||
go c.tsLoop()
|
||||
go c.chanTimeTick.startWatch(&c.wg)
|
||||
go c.importManager.cleanupLoop(&c.wg)
|
||||
go c.importManager.sendOutTasksLoop(&c.wg)
|
||||
go c.importManager.flipTaskStateLoop(&c.wg)
|
||||
}
|
||||
|
||||
// Start starts RootCoord.
|
||||
func (c *Core) Start() error {
|
||||
var err error
|
||||
|
|
Loading…
Reference in New Issue