enhance: optimize CPU usage for CheckHealth requests (#35589)

issue: #35563
1. Use an internal health checker to monitor the cluster's health state,
storing the latest state on the coordinator node. The CheckHealth
request retrieves the cluster's health from this latest state on the
proxy sides, which enhances cluster stability.
2. Each health check will assess all collections and channels, with
detailed failure messages temporarily saved in the latest state.
3. Use CheckHealth request instead of the heavy GetMetrics request on
the querynode and datanode

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/38510/head
jaime 2024-12-17 11:02:45 +08:00 committed by GitHub
parent 2afe2eaf3e
commit 28fdbc4e30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 1298 additions and 483 deletions

View File

@ -382,7 +382,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord

View File

@ -304,6 +304,22 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}
func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
if c.state == commonpb.StateCode_Healthy {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil
} else {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
IsHealthy: false,
Reasons: []string{"fails"},
}, nil
}
}
type mockRootCoordClient struct {
state commonpb.StateCode
cnt atomic.Int64

View File

@ -52,6 +52,7 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
@ -167,6 +168,8 @@ type Server struct {
streamingCoord *streamingcoord.Server
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
type CollectionNameInfo struct {
@ -429,6 +432,8 @@ func (s *Server) initDataCoord() error {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
@ -773,6 +778,8 @@ func (s *Server) startServerLoop() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}
s.healthChecker.Start()
}
func (s *Server) startTaskScheduler() {
@ -1099,6 +1106,9 @@ func (s *Server) Stop() error {
return nil
}
log.Info("datacoord server shutdown")
if s.healthChecker != nil {
s.healthChecker.Close()
}
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")

View File

@ -53,6 +53,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -2509,12 +2510,12 @@ func Test_CheckHealth(t *testing.T) {
return sm
}
getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
getChannelManager := func(findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
}
return channelManager
}
@ -2527,6 +2528,21 @@ func Test_CheckHealth(t *testing.T) {
2: nil,
}
newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
svr := &Server{
ctx: context.TODO(),
sessionManager: getSessionManager(isHealthy),
channelManager: getChannelManager(findWatcherOk),
meta: meta,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
svr.healthChecker.Start()
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
return svr
}
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
@ -2538,9 +2554,8 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(false)
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2549,11 +2564,8 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("check channel watched fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2562,11 +2574,7 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("check checkpoint fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
@ -2576,8 +2584,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}
})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2586,11 +2594,7 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
@ -2608,7 +2612,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}
})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)

View File

@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -1583,20 +1583,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}
err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
}
if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
}
if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
return checkResults
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {

View File

@ -19,6 +19,7 @@ package session
import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -69,7 +71,7 @@ type DataNodeManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckHealth(ctx context.Context) error
CheckDNHealth(ctx context.Context) *healthcheck.Result
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
@ -507,28 +509,44 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ
return merr.CheckRPCCall(status, err)
}
func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()
for _, nodeID := range ids {
nodeID := nodeID
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
wg.Add(1)
go func() {
defer wg.Done()
datanodeClient, err := c.getClient(ctx, nodeID)
if err != nil {
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
}
sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return err
checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
}
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})
if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
}
return group.Wait()
wg.Wait()
return result
}
func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {

View File

@ -6,6 +6,8 @@ import (
context "context"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck"
mock "github.com/stretchr/testify/mock"
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -117,48 +119,50 @@ func (_c *MockDataNodeManager_CheckChannelOperationProgress_Call) RunAndReturn(r
return _c
}
// CheckHealth provides a mock function with given fields: ctx
func (_m *MockDataNodeManager) CheckHealth(ctx context.Context) error {
// CheckDNHealth provides a mock function with given fields: ctx
func (_m *MockDataNodeManager) CheckDNHealth(ctx context.Context) *healthcheck.Result {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for CheckHealth")
panic("no return value specified for CheckDNHealth")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
var r0 *healthcheck.Result
if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*healthcheck.Result)
}
}
return r0
}
// MockDataNodeManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNodeManager_CheckHealth_Call struct {
// MockDataNodeManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth'
type MockDataNodeManager_CheckDNHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// CheckDNHealth is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockDataNodeManager_Expecter) CheckHealth(ctx interface{}) *MockDataNodeManager_CheckHealth_Call {
return &MockDataNodeManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)}
func (_e *MockDataNodeManager_Expecter) CheckDNHealth(ctx interface{}) *MockDataNodeManager_CheckDNHealth_Call {
return &MockDataNodeManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)}
}
func (_c *MockDataNodeManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckHealth_Call {
func (_c *MockDataNodeManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckDNHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockDataNodeManager_CheckHealth_Call) Return(_a0 error) *MockDataNodeManager_CheckHealth_Call {
func (_c *MockDataNodeManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockDataNodeManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockDataNodeManager_CheckHealth_Call {
func (_c *MockDataNodeManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call {
_c.Call.Return(run)
return _c
}

View File

@ -285,7 +285,8 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 {
return binlogIDs
}
func CheckCheckPointsHealth(meta *meta) error {
func CheckCheckPointsHealth(meta *meta) map[int64]string {
checkResult := make(map[int64]string)
for channel, cp := range meta.GetChannelCheckpoints() {
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
if collectionID == -1 {
@ -299,31 +300,30 @@ func CheckCheckPointsHealth(meta *meta) error {
ts, _ := tsoutil.ParseTS(cp.Timestamp)
lag := time.Since(ts)
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {
return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes()))
checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel)
}
}
return nil
return checkResult
}
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error {
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string {
collIDs := meta.ListCollections()
checkResult := make(map[int64]string)
for _, collID := range collIDs {
collInfo := meta.GetCollection(collID)
if collInfo == nil {
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}
for _, channelName := range collInfo.VChannelNames {
_, err := channelManager.FindWatcher(channelName)
if err != nil {
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
return err
checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName)
}
}
}
return nil
return checkResult
}
func createStorageConfig() *indexpb.StorageConfig {

View File

@ -52,7 +52,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
return nil, err
}
minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt()
minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph()
return &metricsinfo.DataNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{},
Rms: rms,

View File

@ -22,6 +22,7 @@ package datanode
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
@ -36,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -45,6 +47,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -573,3 +576,20 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
return merr.Success(), nil
}
func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph()
if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil {
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
}
return healthcheck.OK(), nil
}

View File

@ -110,6 +110,7 @@ func (s *DataNodeServicesSuite) SetupTest() {
s.Require().NoError(err)
s.node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))
s.node.flowgraphManager = pipeline.NewFlowgraphManager()
paramtable.SetNodeID(1)
}
@ -1161,6 +1162,41 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
})
}
func (s *DataNodeServicesSuite) TestCheckHealth() {
s.Run("node not healthy", func() {
s.SetupTest()
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.False(merr.Ok(resp.GetStatus()))
s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})
s.Run("exceeded timetick lag on pipeline", func() {
s.SetupTest()
fgm := pipeline.NewMockFlowgraphManager(s.T())
fgm.EXPECT().GetMinTTFlowGraph().Return("timetick-lag-ch", uint64(3600)).Once()
s.node.flowgraphManager = fgm
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.False(resp.GetIsHealthy())
s.NotEmpty(resp.Reasons)
})
s.Run("ok", func() {
s.SetupTest()
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.True(resp.GetIsHealthy())
s.Empty(resp.Reasons)
})
}
func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
s.Run("node not healthy", func() {
s.SetupTest()

View File

@ -281,3 +281,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact
return client.DropCompactionPlan(ctx, req)
})
}
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}

View File

@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
return s.datanode.DropCompactionPlan(ctx, req)
}
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.datanode.CheckHealth(ctx, req)
}

View File

@ -185,6 +185,10 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC
return m.status, m.err
}
func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.err
}
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
paramtable.Init()

View File

@ -360,3 +360,9 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques
return client.DeleteBatch(ctx, req)
})
}
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}

View File

@ -394,3 +394,7 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return s.querynode.DeleteBatch(ctx, req)
}
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.querynode.CheckHealth(ctx, req)
}

View File

@ -22,7 +22,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -43,6 +42,7 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64
GetMinTTFlowGraph() (string, typeutil.Timestamp)
GetChannelsJSON() string
GetSegmentsJSON() string
Close()
@ -76,7 +76,6 @@ func (fm *fgManagerImpl) RemoveFlowgraph(channel string) {
fm.flowgraphs.Remove(channel)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
util.GetRateCollector().RemoveFlowGraphChannel(channel)
}
}
@ -120,6 +119,22 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
return collectionSet.Collect()
}
// GetMinTTFlowGraph returns the vchannel and minimal time tick of flow graphs.
func (fm *fgManagerImpl) GetMinTTFlowGraph() (string, typeutil.Timestamp) {
minTt := typeutil.MaxTimestamp
var channel string
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
if minTt > latestTimeTick {
minTt = latestTimeTick
channel = ch
}
return true
})
return channel, minTt
}
// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
var channels []*metricsinfo.Channel

View File

@ -309,6 +309,61 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s
return _c
}
// GetMinTTFlowGraph provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetMinTTFlowGraph() (string, uint64) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetMinTTFlowGraph")
}
var r0 string
var r1 uint64
if rf, ok := ret.Get(0).(func() (string, uint64)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func() uint64); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(uint64)
}
return r0, r1
}
// MockFlowgraphManager_GetMinTTFlowGraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinTTFlowGraph'
type MockFlowgraphManager_GetMinTTFlowGraph_Call struct {
*mock.Call
}
// GetMinTTFlowGraph is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetMinTTFlowGraph() *MockFlowgraphManager_GetMinTTFlowGraph_Call {
return &MockFlowgraphManager_GetMinTTFlowGraph_Call{Call: _e.mock.On("GetMinTTFlowGraph")}
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Run(run func()) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Return(_a0 string, _a1 uint64) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) RunAndReturn(run func() (string, uint64)) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Return(run)
return _c
}
// GetSegmentsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetSegmentsJSON() string {
ret := _m.Called()

View File

@ -24,7 +24,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// rateCol is global RateCollector in DataNode.
@ -38,7 +37,6 @@ type RateCollector struct {
*ratelimitutil.RateCollector
flowGraphTtMu sync.Mutex
flowGraphTt map[string]typeutil.Timestamp
}
func initGlobalRateCollector() {
@ -75,35 +73,5 @@ func newRateCollector() (*RateCollector, error) {
}
return &RateCollector{
RateCollector: rc,
flowGraphTt: make(map[string]typeutil.Timestamp),
}, nil
}
// UpdateFlowGraphTt updates RateCollector's flow graph time tick.
func (r *RateCollector) UpdateFlowGraphTt(channel string, t typeutil.Timestamp) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
r.flowGraphTt[channel] = t
}
// RemoveFlowGraphChannel removes channel from flowGraphTt.
func (r *RateCollector) RemoveFlowGraphChannel(channel string) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
delete(r.flowGraphTt, channel)
}
// GetMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs.
func (r *RateCollector) GetMinFlowGraphTt() (string, typeutil.Timestamp) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
minTt := typeutil.MaxTimestamp
var channel string
for c, t := range r.flowGraphTt {
if minTt > t {
minTt = t
channel = c
}
}
return channel, minTt
}

View File

@ -1,42 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestRateCollector(t *testing.T) {
t.Run("test FlowGraphTt", func(t *testing.T) {
collector, err := newRateCollector()
assert.NoError(t, err)
c, minTt := collector.GetMinFlowGraphTt()
assert.Equal(t, "", c)
assert.Equal(t, typeutil.MaxTimestamp, minTt)
collector.UpdateFlowGraphTt("channel1", 100)
collector.UpdateFlowGraphTt("channel2", 200)
collector.UpdateFlowGraphTt("channel3", 50)
c, minTt = collector.GetMinFlowGraphTt()
assert.Equal(t, "channel3", c)
assert.Equal(t, typeutil.Timestamp(50), minTt)
})
}

View File

@ -91,6 +91,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func
return _c
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNode_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call {
return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// CompactionV2 provides a mock function with given fields: _a0, _a1
func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -105,6 +105,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru
return _c
}
// CheckHealth provides a mock function with given fields: ctx, in, opts
func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNodeClient_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.CheckHealthRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call {
return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockDataNodeClient) Close() error {
ret := _m.Called()

View File

@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter {
return &MockQueryNode_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNode_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call {
return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter {
return &MockQueryNodeClient_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNodeClient_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.CheckHealthRequest
// - opts ...grpc.CallOption
func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call {
return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockQueryNodeClient) Close() error {
ret := _m.Called()

View File

@ -137,6 +137,8 @@ service DataNode {
rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}
rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {}
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
}
message FlushRequest {

View File

@ -175,7 +175,9 @@ service QueryNode {
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
}
}
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
}
// --------------------QueryCoord grpc request and response proto------------------

View File

@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter {
return &MockQueryNodeServer_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNodeServer_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call {
return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -55,6 +55,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
@ -138,6 +139,8 @@ type Server struct {
proxyClientManager proxyutil.ProxyClientManagerInterface
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
func NewQueryCoord(ctx context.Context) (*Server, error) {
@ -424,6 +427,8 @@ func (s *Server) initQueryCoord() error {
// Init load status cache
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return err
}
@ -567,6 +572,7 @@ func (s *Server) startQueryCoord() error {
s.startServerLoop()
s.afterStart()
s.healthChecker.Start()
s.UpdateStateCode(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID())
return nil
@ -605,7 +611,9 @@ func (s *Server) Stop() error {
// FOLLOW the dependence graph:
// job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session
// observers -> dist controller
if s.healthChecker != nil {
s.healthChecker.Close()
}
if s.jobScheduler != nil {
log.Info("stop job scheduler...")
s.jobScheduler.Stop()

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -35,7 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -913,16 +914,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil
}
errReasons, err := s.checkNodeHealth(ctx)
if err != nil || len(errReasons) != 0 {
return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil
}
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
log.Ctx(ctx).Warn("some collection is not queryable during health check", zap.Error(err))
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
return componentutil.CheckHealthRespWithErr(nil), nil
checkResults := s.broadcastCheckHealth(ctx)
for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable))
}
return checkResults
}
func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
@ -953,6 +958,39 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
return errReasons, err
}
func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
for _, node := range s.nodeMgr.GetAll() {
node := node
wg.Add(1)
go func() {
defer wg.Done()
checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID())
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
}
if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
}
wg.Wait()
return result
}
func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("rgName", req.GetResourceGroup()),

View File

@ -47,6 +47,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
@ -170,6 +171,13 @@ func (suite *ServiceSuite) SetupTest() {
}))
suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node)
}
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil).Maybe()
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
@ -1627,6 +1635,9 @@ func (suite *ServiceSuite) TestCheckHealth() {
suite.loadAll()
ctx := context.Background()
server := suite.server
server.healthChecker = healthcheck.NewChecker(50*time.Millisecond, suite.server.healthCheckFn)
server.healthChecker.Start()
defer server.healthChecker.Close()
assertCheckHealthResult := func(isHealthy bool) {
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -1639,28 +1650,38 @@ func (suite *ServiceSuite) TestCheckHealth() {
}
}
setNodeSate := func(state commonpb.StateCode) {
// Test for components state fail
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset()
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(
&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: state},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
},
nil).Maybe()
setNodeSate := func(isHealthy bool, isRPCFail bool) {
var resp *milvuspb.CheckHealthResponse
if isHealthy {
resp = healthcheck.OK()
} else {
resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck))
}
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
if isRPCFail {
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny}
}
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset()
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe()
time.Sleep(1 * time.Second)
}
// Test for server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
assertCheckHealthResult(false)
// Test for components state fail
setNodeSate(commonpb.StateCode_Abnormal)
// Test for check health has some error reasons
setNodeSate(false, false)
server.UpdateStateCode(commonpb.StateCode_Healthy)
assertCheckHealthResult(false)
// Test for check health rpc fail
setNodeSate(true, true)
server.UpdateStateCode(commonpb.StateCode_Healthy)
assertCheckHealthResult(false)
// Test for check load percentage fail
setNodeSate(commonpb.StateCode_Healthy)
setNodeSate(true, false)
assertCheckHealthResult(true)
// Test for check channel ok
@ -1682,7 +1703,14 @@ func (suite *ServiceSuite) TestCheckHealth() {
for _, node := range suite.nodes {
suite.nodeMgr.Stopping(node)
}
assertCheckHealthResult(true)
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key)
time.Sleep(1500 * time.Millisecond)
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, true)
suite.NotEmpty(resp.Reasons)
}
func (suite *ServiceSuite) TestGetShardLeaders() {

View File

@ -52,6 +52,7 @@ type Cluster interface {
GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error)
GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error)
CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error)
Start()
Stop()
}
@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types
return nil
}
func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
var (
resp *milvuspb.CheckHealthResponse
err error
)
err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) {
resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
})
if err1 != nil {
return nil, err1
}
return resp, err
}
type clients struct {
sync.RWMutex
clients map[int64]types.QueryNodeClient // nodeID -> client

View File

@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter {
return &MockCluster_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: ctx, nodeID
func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(ctx, nodeID)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, nodeID)
}
if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
r1 = rf(ctx, nodeID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockCluster_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call {
return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)}
}
func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
})
return _c
}
func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// GetComponentStates provides a mock function with given fields: ctx, nodeID
func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) {
ret := _m.Called(ctx, nodeID)

View File

@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target
for segmentID, info := range segmentDist {
_, exist := leader.Segments[segmentID]
if !exist {
log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID
if l0WithWrongLocation {
log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
}
@ -113,8 +113,6 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me
) ([]*querypb.ShardLeadersList, error) {
ret := make([]*querypb.ShardLeadersList, 0)
for _, channel := range channels {
log := log.With(zap.String("channel", channel.GetChannelName()))
var channelErr error
leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
if len(leaders) == 0 {
@ -132,7 +130,7 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me
if len(readableLeaders) == 0 {
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
log.Warn(msg, zap.Error(channelErr))
log.RatedWarn(60, msg, zap.Error(channelErr))
err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error())
return nil, err
}
@ -185,8 +183,9 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetMan
}
// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string {
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
checkResult := make(map[int64]string)
for _, coll := range m.GetAllCollections(ctx) {
err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll)
// the collection is not queryable, if meet following conditions:
@ -194,15 +193,10 @@ func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta
// 2. Collection is not starting to release
// 3. The load percentage has not been updated in the last 5 minutes.
if err != nil && m.Exist(ctx, coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval {
log.Ctx(ctx).Warn("collection not querable",
zap.Int64("collectionID", coll.CollectionID),
zap.Time("lastUpdated", coll.UpdatedAt),
zap.Duration("maxInterval", maxInterval),
zap.Error(err))
return err
checkResult[coll.CollectionID] = err.Error()
}
}
return nil
return checkResult
}
// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection

View File

@ -54,13 +54,7 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) {
return rms, nil
}
// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
rms, err := getRateMetric()
if err != nil {
return nil, err
}
func getMinTSafe(node *QueryNode) (string, uint64) {
minTsafeChannel := ""
minTsafe := uint64(math.MaxUint64)
node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool {
@ -71,7 +65,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
}
return true
})
return minTsafeChannel, minTsafe
}
// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
rms, err := getRateMetric()
if err != nil {
return nil, err
}
minTsafeChannel, minTsafe := getMinTSafe(node)
collections := node.manager.Collection.ListWithName()
nodeID := fmt.Sprint(node.GetNodeID())

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
@ -54,6 +55,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1384,6 +1386,25 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
return merr.Success(), nil
}
func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}
defer node.lifetime.Done()
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minTsafeChannel, minTsafe := getMinTSafe(node)
if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil {
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
}
return healthcheck.OK(), nil
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {

View File

@ -98,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false")
suite.rootPath = suite.T().Name()
suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name())
suite.collectionID = 111
suite.collectionName = "test-collection"
suite.partitionIDs = []int64{222}
@ -2222,6 +2222,44 @@ func (suite *ServiceSuite) TestLoadPartition() {
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
}
func (suite *ServiceSuite) TestCheckHealth() {
suite.Run("node not healthy", func() {
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.False(merr.Ok(resp.GetStatus()))
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})
suite.Run("exceeded timetick lag on pipeline", func() {
sd1 := delegator.NewMockShardDelegator(suite.T())
sd1.EXPECT().GetTSafe().Return(100)
sd1.EXPECT().Close().Maybe()
suite.node.delegators.Insert("timetick-lag-ch", sd1)
defer suite.node.delegators.GetAndRemove("timetick-lag-ch")
ctx := context.Background()
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.True(merr.Ok(resp.GetStatus()))
suite.False(resp.GetIsHealthy())
suite.NotEmpty(resp.Reasons)
})
suite.Run("ok", func() {
ctx := context.Background()
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.True(merr.Ok(resp.GetStatus()))
suite.True(resp.GetIsHealthy())
suite.Empty(resp.Reasons)
})
}
func TestQueryNodeService(t *testing.T) {
suite.Run(t, new(ServiceSuite))
}

View File

@ -405,6 +405,7 @@ func newMockProxy() *mockProxy {
func newTestCore(opts ...Opt) *Core {
c := &Core{
ctx: context.TODO(),
metricsRequest: metricsinfo.NewMetricsRequest(),
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}},
}

View File

@ -32,7 +32,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -50,6 +49,7 @@ import (
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -130,6 +130,7 @@ type Core struct {
activateFunc func() error
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
// --------------------- function --------------------------
@ -500,6 +501,8 @@ func (c *Core) initInternal() error {
return err
}
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn)
log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
return nil
}
@ -794,6 +797,7 @@ func (c *Core) startInternal() error {
}()
c.startServerLoop()
c.healthChecker.Start()
c.UpdateStateCode(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
log.Info("rootcoord startup successfully")
@ -855,6 +859,10 @@ func (c *Core) revokeSession() {
// Stop stops rootCoord.
func (c *Core) Stop() error {
c.UpdateStateCode(commonpb.StateCode_Abnormal)
if c.healthChecker != nil {
c.healthChecker.Close()
}
c.stopExecutor()
c.stopScheduler()
if c.proxyWatcher != nil {
@ -3102,53 +3110,40 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest)
}, nil
}
group, ctx := errgroup.WithContext(ctx)
errs := typeutil.NewConcurrentSet[error]()
latestCheckResult := c.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
func (c *Core) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
proxyClients := c.proxyClientManager.GetProxyClients()
wg := sync.WaitGroup{}
lock := sync.Mutex{}
result := healthcheck.NewResult()
proxyClients.Range(func(key int64, value types.ProxyClient) bool {
nodeID := key
proxyClient := value
group.Go(func() error {
sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
errs.Insert(err)
return err
}
wg.Add(1)
go func() {
defer wg.Done()
resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err)
err = merr.AnalyzeState("Proxy", nodeID, sta)
lock.Lock()
defer lock.Unlock()
if err != nil {
errs.Insert(err)
result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
}
return err
})
}()
return true
})
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
if maxDelay > 0 {
group.Go(func() error {
err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
if err != nil {
errs.Insert(err)
}
return err
})
}
err := group.Wait()
if err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: false,
Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
return err.Error()
}),
}, nil
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
wg.Wait()
return result
}
func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {

View File

@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -40,6 +39,7 @@ import (
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util"
@ -49,7 +49,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1479,65 +1478,6 @@ func TestRootCoord_AlterCollection(t *testing.T) {
}
func TestRootCoord_CheckHealth(t *testing.T) {
getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0),
}, nil
}
getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0),
}, nil
}
querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0)
datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0)
dcClient := mocks.NewMockDataCoordClient(t)
dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT))
qcClient := mocks.NewMockQueryCoordClient(t)
qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT))
errDataCoordClient := mocks.NewMockDataCoordClient(t)
errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
errQueryCoordClient := mocks.NewMockQueryCoordClient(t)
errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withAbnormalCode())
@ -1547,25 +1487,13 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok with disabled tt lag configuration", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(), withValidProxyManager())
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Empty(t, resp.Reasons)
})
t.Run("proxy health check fail with invalid proxy", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(), withInvalidProxyManager())
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
c.healthChecker.Start()
defer c.healthChecker.Close()
c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
time.Sleep(50 * time.Millisecond)
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -1574,55 +1502,14 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("proxy health check fail with get metrics error", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
t.Run("ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(), withValidProxyManager())
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
c.healthChecker.Start()
defer c.healthChecker.Close()
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient))
time.Sleep(50 * time.Millisecond)
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
})
t.Run("ok with tt lag exceeded", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok with tt lag checking", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)

View File

@ -20,10 +20,8 @@ import (
"context"
"fmt"
"strconv"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/json"
@ -34,7 +32,6 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -284,97 +281,3 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn
return ret, nil
}
func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout)
defer cancel()
now := time.Now()
group := &errgroup.Group{}
queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
group.Go(func() error {
queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord)
if err != nil {
return err
}
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
qm := queryNodeMetric.QuotaMetrics
if qm != nil {
if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
}
}
}
}
return nil
})
// get Data cluster metrics
group.Go(func() error {
dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord)
if err != nil {
return err
}
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
dm := dataNodeMetric.QuotaMetrics
if dm != nil {
if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
}
}
}
}
return nil
})
err := group.Wait()
if err != nil {
return err
}
var maxLagChannel string
var maxLag time.Duration
findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) {
for _, param := range params {
param.Range(func(k string, v time.Duration) bool {
if v > maxLag {
maxLag = v
maxLagChannel = k
}
return true
})
}
}
var errStr string
findMaxLagChannel(queryNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
maxLagChannel = ""
maxLag = 0
findMaxLagChannel(dataNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
if errStr != "" {
errStr += ", "
}
errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
if errStr != "" {
return fmt.Errorf("max timetick lag execced threhold: %s", errStr)
}
return nil
}

View File

@ -84,17 +84,3 @@ func WaitForComponentHealthy[T interface {
}](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep)
}
func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse {
if err != nil {
return CheckHealthRespWithErrMsg(err.Error())
}
return CheckHealthRespWithErrMsg()
}
func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse {
if len(errMsg) != 0 {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg}
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
}

View File

@ -0,0 +1,276 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package healthcheck
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// UnHealthyLevel represents the health level of a system.
type UnHealthyLevel int
const (
// Healthy means the system is operating normally.
Healthy UnHealthyLevel = iota
// Warning indicates minor issues that might escalate.
Warning
// Critical indicates major issues that need immediate attention.
Critical
// Fatal indicates system failure.
Fatal
)
// String returns the string representation of the UnHealthyLevel.
func (u UnHealthyLevel) String() string {
switch u {
case Healthy:
return "Healthy"
case Warning:
return "Warning"
case Critical:
return "Critical"
case Fatal:
return "Fatal"
default:
return "Unknown"
}
}
type Item int
const (
ChannelsWatched Item = iota
CheckpointLagExceed
CollectionQueryable
TimeTickLagExceed
NodeHealthCheck
)
func getUnhealthyLevel(item Item) UnHealthyLevel {
switch item {
case ChannelsWatched:
return Fatal
case CheckpointLagExceed:
return Fatal
case TimeTickLagExceed:
return Fatal
case NodeHealthCheck:
return Fatal
case CollectionQueryable:
return Critical
default:
panic(fmt.Sprintf("unknown health check item: %d", int(item)))
}
}
type Result struct {
UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"`
UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"`
}
func NewResult() *Result {
return &Result{}
}
func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) {
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm)
}
func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) {
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm)
}
func (r *Result) AppendResult(other *Result) {
if other == nil {
return
}
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...)
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...)
}
func (r *Result) IsEmpty() bool {
return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0
}
func (r *Result) IsHealthy() bool {
if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 {
return true
}
for _, unm := range r.UnhealthyClusterMsgs {
if unm.Reason.UnhealthyLevel == Fatal {
return false
}
}
for _, ucm := range r.UnhealthyCollectionMsgs {
if ucm.Reason.UnhealthyLevel == Fatal {
return false
}
}
return true
}
type UnhealthyReason struct {
UnhealthyMsg string `json:"unhealthy_msg"`
UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"`
}
type UnhealthyClusterMsg struct {
Role string `json:"role"`
NodeID int64 `json:"node_id"`
Reason *UnhealthyReason `json:"reason"`
}
func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg {
return &UnhealthyClusterMsg{
Role: role,
NodeID: nodeID,
Reason: &UnhealthyReason{
UnhealthyMsg: unhealthyMsg,
UnhealthyLevel: getUnhealthyLevel(item),
},
}
}
type UnhealthyCollectionMsg struct {
DatabaseID int64 `json:"database_id"`
CollectionID int64 `json:"collection_id"`
Reason *UnhealthyReason `json:"reason"`
}
func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg {
return &UnhealthyCollectionMsg{
CollectionID: collectionID,
Reason: &UnhealthyReason{
UnhealthyMsg: unhealthyMsg,
UnhealthyLevel: getUnhealthyLevel(item),
},
}
}
type Checker struct {
sync.RWMutex
interval time.Duration
done chan struct{}
checkFn func() *Result
latestResult *Result
once sync.Once
}
func NewChecker(interval time.Duration, checkFn func() *Result) *Checker {
checker := &Checker{
interval: interval,
checkFn: checkFn,
latestResult: NewResult(),
done: make(chan struct{}, 1),
once: sync.Once{},
}
return checker
}
func (hc *Checker) Start() {
go func() {
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
log.Info("start health checker")
for {
select {
case <-ticker.C:
hc.Lock()
hc.latestResult = hc.checkFn()
hc.Unlock()
case <-hc.done:
log.Info("stop health checker")
return
}
}
}()
}
func (hc *Checker) GetLatestCheckResult() *Result {
hc.RLock()
defer hc.RUnlock()
return hc.latestResult
}
func (hc *Checker) Close() {
hc.once.Do(func() {
close(hc.done)
})
}
func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result {
var r Result
if len(resp.Reasons) == 0 {
return &r
}
if len(resp.Reasons) > 1 {
log.Error("invalid check result", zap.Any("reasons", resp.Reasons))
return &r
}
err := json.Unmarshal([]byte(resp.Reasons[0]), &r)
if err != nil {
log.Error("unmarshal check result error", zap.String("error", err.Error()))
}
return &r
}
func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse {
r := &Result{UnhealthyClusterMsgs: msg}
reasons, err := json.Marshal(r)
if err != nil {
log.Error("marshal check result error", zap.String("error", err.Error()))
}
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: r.IsHealthy(),
Reasons: []string{string(reasons)},
}
}
func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse {
if checkResult.IsEmpty() {
return OK()
}
reason, err := json.Marshal(checkResult)
if err != nil {
log.Error("marshal check result error", zap.String("error", err.Error()))
}
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: checkResult.IsHealthy(),
Reasons: []string{string(reason)},
}
}
func OK() *milvuspb.CheckHealthResponse {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
}

View File

@ -0,0 +1,60 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package healthcheck
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestChecker(t *testing.T) {
expected1 := NewResult()
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{
CollectionID: 1,
Reason: &UnhealthyReason{
UnhealthyMsg: "msg2",
UnhealthyLevel: Critical,
},
})
checkFn := func() *Result {
return expected1
}
checker := NewChecker(100*time.Millisecond, checkFn)
go checker.Start()
time.Sleep(150 * time.Millisecond)
actual1 := checker.GetLatestCheckResult()
assert.Equal(t, expected1, actual1)
assert.False(t, actual1.IsHealthy())
chr := GetCheckHealthResponseFromResult(actual1)
assert.Equal(t, merr.Success(), chr.Status)
assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy)
assert.Equal(t, 1, len(chr.Reasons))
actualResult := GetHealthCheckResultFromResp(chr)
assert.Equal(t, actual1, actualResult)
checker.Close()
}

View File

@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo
func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}

View File

@ -134,6 +134,10 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet
return &querypb.DeleteBatchResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) Close() error {
return m.Err
}

View File

@ -152,6 +152,10 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa
return qn.QueryNode.DeleteBatch(ctx, in)
}
func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return qn.QueryNode.CheckHealth(ctx, req)
}
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
return &qnServerWrapper{
QueryNode: qn,

View File

@ -24,7 +24,6 @@ import (
"fmt"
"net"
"reflect"
"regexp"
"strconv"
"strings"
"time"
@ -291,13 +290,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri
}
func GetCollectionIDFromVChannel(vChannelName string) int64 {
re := regexp.MustCompile(`.*_(\d+)v\d+`)
matches := re.FindStringSubmatch(vChannelName)
if len(matches) > 1 {
number, err := strconv.ParseInt(matches[1], 0, 64)
if err == nil {
return number
}
end := strings.LastIndexByte(vChannelName, 'v')
if end <= 0 {
return -1
}
start := strings.LastIndexByte(vChannelName, '_')
if start <= 0 {
return -1
}
collectionIDStr := vChannelName[start+1 : end]
if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil {
return collectionID
}
return -1
}

View File

@ -299,6 +299,13 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error {
return CheckHealthy(stateCode)
}
func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error {
if err != nil {
return errors.Wrap(err, "service is unhealthy")
}
return AnalyzeState(role, nodeID, resp)
}
func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error {
if err := Error(state.GetStatus()); err != nil {
return errors.Wrapf(err, "%s=%d not healthy", role, nodeID)

View File

@ -287,6 +287,9 @@ type commonConfig struct {
// Local RPC enabled for milvus internal communication when mix or standalone mode.
LocalRPCEnabled ParamItem `refreshable:"false"`
HealthCheckInterval ParamItem `refreshable:"true"`
HealthCheckRPCTimeout ParamItem `refreshable:"true"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -946,6 +949,22 @@ This helps Milvus-CDC synchronize incremental data`,
Export: true,
}
p.LocalRPCEnabled.Init(base.mgr)
p.HealthCheckInterval = ParamItem{
Key: "common.healthcheck.interval.seconds",
Version: "2.4.8",
DefaultValue: "30",
Doc: `health check interval in seconds, default 30s`,
}
p.HealthCheckInterval.Init(base.mgr)
p.HealthCheckRPCTimeout = ParamItem{
Key: "common.healthcheck.timeout.seconds",
Version: "2.4.8",
DefaultValue: "10",
Doc: `RPC timeout for health check request`,
}
p.HealthCheckRPCTimeout.Init(base.mgr)
}
type gpuConfig struct {
@ -2250,9 +2269,9 @@ If this parameter is set false, Milvus simply searches the growing segments with
p.UpdateCollectionLoadStatusInterval = ParamItem{
Key: "queryCoord.updateCollectionLoadStatusInterval",
Version: "2.4.7",
DefaultValue: "5",
DefaultValue: "300",
PanicIfEmpty: true,
Doc: "5m, max interval of updating collection loaded status for check health",
Doc: "300s, max interval of updating collection loaded status for check health",
Export: true,
}

View File

@ -131,6 +131,11 @@ func TestComponentParam(t *testing.T) {
params.Save("common.gchelper.minimumGoGC", "80")
assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt())
params.Save("common.healthcheck.interval.seconds", "60")
assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second))
params.Save("common.healthcheck.timeout.seconds", "5")
assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt())
assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings()))
@ -326,8 +331,8 @@ func TestComponentParam(t *testing.T) {
checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt()
assert.Equal(t, 2000, checkHealthRPCTimeout)
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
assert.Equal(t, updateInterval, time.Minute*5)
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
assert.Equal(t, time.Second*300, updateInterval)
assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat())
params.Save("queryCoord.globalRowCountFactor", "0.4")

View File

@ -16,7 +16,13 @@
package ratelimitutil
import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
import (
"fmt"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
var QuotaErrorString = map[commonpb.ErrorCode]string{
commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator",
@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{
func GetQuotaErrorString(errCode commonpb.ErrorCode) string {
return QuotaErrorString[errCode]
}
func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error {
if channel != "" && maxDelay > 0 {
minTt, _ := tsoutil.ParseTS(minTT)
delay := time.Since(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel)
}
}
return nil
}