mirror of https://github.com/milvus-io/milvus.git
parent
4fe363c4b2
commit
4336ed8609
|
@ -22,6 +22,7 @@ import (
|
|||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
@ -112,13 +113,16 @@ type Server struct {
|
|||
// Active-standby
|
||||
enableActiveStandBy bool
|
||||
activateFunc func() error
|
||||
|
||||
nodeUpEventChan chan int64
|
||||
}
|
||||
|
||||
func NewQueryCoord(ctx context.Context) (*Server, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
server := &Server{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
nodeUpEventChan: make(chan int64, 10240),
|
||||
}
|
||||
server.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
server.queryNodeCreator = session.DefaultQueryNodeCreator
|
||||
|
@ -387,7 +391,9 @@ func (s *Server) startQueryCoord() error {
|
|||
for _, node := range sessions {
|
||||
s.handleNodeUp(node.ServerID)
|
||||
}
|
||||
s.wg.Add(1)
|
||||
|
||||
s.wg.Add(2)
|
||||
go s.handleNodeUpLoop()
|
||||
go s.watchNodes(revision)
|
||||
|
||||
log.Info("start recovering dist and target")
|
||||
|
@ -621,9 +627,7 @@ func (s *Server) watchNodes(revision int64) {
|
|||
zap.String("nodeAddr", addr),
|
||||
)
|
||||
s.nodeMgr.Add(session.NewNodeInfo(nodeID, addr))
|
||||
s.handleNodeUp(nodeID)
|
||||
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||
s.checkerController.Check()
|
||||
s.nodeUpEventChan <- nodeID
|
||||
|
||||
case sessionutil.SessionUpdateEvent:
|
||||
nodeID := event.Session.ServerID
|
||||
|
@ -646,6 +650,44 @@ func (s *Server) watchNodes(revision int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleNodeUpLoop() {
|
||||
log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60)
|
||||
defer s.wg.Done()
|
||||
// small check interval value can reduce the latency of node up
|
||||
ticker := time.NewTicker(Params.QueryCoordCfg.CheckHealthInterval.GetAsDuration(time.Millisecond))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond))
|
||||
defer cancel()
|
||||
reasons, err := s.checkNodeHealth(ctx)
|
||||
if err != nil {
|
||||
log.RatedWarn(10, "unhealthy node exist, node up will be delayed",
|
||||
zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)),
|
||||
zap.Int("unhealthyNodeNum", len(reasons)),
|
||||
zap.Strings("unhealthyReason", reasons))
|
||||
return
|
||||
}
|
||||
for len(s.nodeUpEventChan) > 0 {
|
||||
nodeID := <-s.nodeUpEventChan
|
||||
if s.nodeMgr.Get(nodeID) != nil {
|
||||
// only if all nodes are healthy, node up event will be handled
|
||||
s.handleNodeUp(nodeID)
|
||||
s.metricsCacheManager.InvalidateSystemInfoMetrics()
|
||||
s.checkerController.Check()
|
||||
} else {
|
||||
log.Warn("node already down",
|
||||
zap.Int64("nodeID", nodeID))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleNodeUp(node int64) {
|
||||
log := log.With(zap.Int64("nodeID", node))
|
||||
s.taskScheduler.AddExecutor(node)
|
||||
|
|
|
@ -171,19 +171,62 @@ func (suite *ServerSuite) TestRecoverFailed() {
|
|||
}
|
||||
|
||||
func (suite *ServerSuite) TestNodeUp() {
|
||||
newNode := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 100)
|
||||
newNode.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Status(nil)}, nil)
|
||||
err := newNode.Start()
|
||||
node1 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 100)
|
||||
node1.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Status(nil)}, nil)
|
||||
err := node1.Start()
|
||||
suite.NoError(err)
|
||||
defer newNode.Stop()
|
||||
defer node1.Stop()
|
||||
|
||||
suite.Eventually(func() bool {
|
||||
node := suite.server.nodeMgr.Get(newNode.ID)
|
||||
node := suite.server.nodeMgr.Get(node1.ID)
|
||||
if node == nil {
|
||||
return false
|
||||
}
|
||||
for _, collection := range suite.collections {
|
||||
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, newNode.ID)
|
||||
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node1.ID)
|
||||
if replica == nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}, 5*time.Second, time.Second)
|
||||
|
||||
// mock node1 lost connection
|
||||
fakeLostConnectionErr := errors.New("fake lost connection error")
|
||||
node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(nil, fakeLostConnectionErr)
|
||||
|
||||
node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101)
|
||||
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Status(nil)}, nil).Maybe()
|
||||
err = node2.Start()
|
||||
suite.NoError(err)
|
||||
defer node2.Stop()
|
||||
|
||||
// expect node2 won't be add to qc, due to unhealthy nodes exist
|
||||
suite.Eventually(func() bool {
|
||||
node := suite.server.nodeMgr.Get(node2.ID)
|
||||
if node == nil {
|
||||
return false
|
||||
}
|
||||
for _, collection := range suite.collections {
|
||||
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
|
||||
if replica == nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 5*time.Second, time.Second)
|
||||
|
||||
// mock node1 down, so no unhealthy nodes exist
|
||||
suite.server.nodeMgr.Remove(node1.ID)
|
||||
|
||||
// expect node2 will be add to qc
|
||||
suite.Eventually(func() bool {
|
||||
node := suite.server.nodeMgr.Get(node2.ID)
|
||||
if node == nil {
|
||||
return false
|
||||
}
|
||||
for _, collection := range suite.collections {
|
||||
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
|
||||
if replica == nil {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -946,8 +946,17 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
|
|||
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil
|
||||
}
|
||||
|
||||
errReasons, err := s.checkNodeHealth(ctx)
|
||||
if err != nil || len(errReasons) != 0 {
|
||||
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: errReasons}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.CheckHealthResponse{IsHealthy: true, Reasons: errReasons}, nil
|
||||
}
|
||||
|
||||
func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
|
||||
group, ctx := errgroup.WithContext(ctx)
|
||||
errReasons := make([]string, 0, len(s.nodeMgr.GetAll()))
|
||||
errReasons := make([]string, 0)
|
||||
|
||||
mu := &sync.Mutex{}
|
||||
for _, node := range s.nodeMgr.GetAll() {
|
||||
|
@ -965,11 +974,8 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
|
|||
}
|
||||
|
||||
err := group.Wait()
|
||||
if err != nil || len(errReasons) != 0 {
|
||||
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: errReasons}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.CheckHealthResponse{IsHealthy: true, Reasons: errReasons}, nil
|
||||
return errReasons, err
|
||||
}
|
||||
|
||||
func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
|
||||
|
|
|
@ -1127,6 +1127,8 @@ type queryCoordConfig struct {
|
|||
CheckNodeInReplicaInterval ParamItem `refreshable:"false"`
|
||||
CheckResourceGroupInterval ParamItem `refreshable:"false"`
|
||||
EnableRGAutoRecover ParamItem `refreshable:"true"`
|
||||
CheckHealthInterval ParamItem `refreshable:"false"`
|
||||
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
|
@ -1352,6 +1354,26 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
PanicIfEmpty: true,
|
||||
}
|
||||
p.EnableRGAutoRecover.Init(base.mgr)
|
||||
|
||||
p.CheckHealthInterval = ParamItem{
|
||||
Key: "queryCoord.checkHealthInterval",
|
||||
Version: "2.2.7",
|
||||
DefaultValue: "3000",
|
||||
PanicIfEmpty: true,
|
||||
Doc: "3s, the interval when query coord try to check health of query node",
|
||||
Export: true,
|
||||
}
|
||||
p.CheckHealthInterval.Init(base.mgr)
|
||||
|
||||
p.CheckHealthRPCTimeout = ParamItem{
|
||||
Key: "queryCoord.checkHealthRPCTimeout",
|
||||
Version: "2.2.7",
|
||||
DefaultValue: "100",
|
||||
PanicIfEmpty: true,
|
||||
Doc: "100ms, the timeout of check health rpc to query node",
|
||||
Export: true,
|
||||
}
|
||||
p.CheckHealthRPCTimeout.Init(base.mgr)
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -267,6 +267,11 @@ func TestComponentParam(t *testing.T) {
|
|||
params.Save("queryCoord.enableRGAutoRecover", "false")
|
||||
enableResourceGroupAutoRecover = Params.EnableRGAutoRecover
|
||||
assert.Equal(t, false, enableResourceGroupAutoRecover.GetAsBool())
|
||||
|
||||
checkHealthInterval := Params.CheckHealthInterval.GetAsInt()
|
||||
assert.Equal(t, 3000, checkHealthInterval)
|
||||
checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt()
|
||||
assert.Equal(t, 100, checkHealthRPCTimeout)
|
||||
})
|
||||
|
||||
t.Run("test queryNodeConfig", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue