fix: revert optimize CPU usage for CheckHealth requests (#35589) (#38555)

issue: #35563

Signed-off-by: jaime <yun.zhang@zilliz.com>
pull/38231/head
jaime 2024-12-19 00:38:45 +08:00 committed by GitHub
parent 134d06f7e6
commit 78438ef41e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 485 additions and 1301 deletions

View File

@ -382,7 +382,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord

View File

@ -304,22 +304,6 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}
func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
if c.state == commonpb.StateCode_Healthy {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil
} else {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
IsHealthy: false,
Reasons: []string{"fails"},
}, nil
}
}
type mockRootCoordClient struct {
state commonpb.StateCode
cnt atomic.Int64

View File

@ -52,7 +52,6 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
@ -168,8 +167,6 @@ type Server struct {
streamingCoord *streamingcoord.Server
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
type CollectionNameInfo struct {
@ -432,8 +429,6 @@ func (s *Server) initDataCoord() error {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
@ -778,8 +773,6 @@ func (s *Server) startServerLoop() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}
s.healthChecker.Start()
}
func (s *Server) startTaskScheduler() {
@ -1106,9 +1099,6 @@ func (s *Server) Stop() error {
return nil
}
log.Info("datacoord server shutdown")
if s.healthChecker != nil {
s.healthChecker.Close()
}
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")

View File

@ -54,7 +54,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -2536,12 +2535,12 @@ func Test_CheckHealth(t *testing.T) {
return sm
}
getChannelManager := func(findWatcherOk bool) ChannelManager {
getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
}
return channelManager
}
@ -2554,21 +2553,6 @@ func Test_CheckHealth(t *testing.T) {
2: nil,
}
newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
svr := &Server{
ctx: context.TODO(),
sessionManager: getSessionManager(isHealthy),
channelManager: getChannelManager(findWatcherOk),
meta: meta,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
svr.healthChecker.Start()
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
return svr
}
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
@ -2580,8 +2564,9 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("data node health check is fail", func(t *testing.T) {
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(false)
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2590,8 +2575,11 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("check channel watched fail", func(t *testing.T) {
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2600,7 +2588,11 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("check checkpoint fail", func(t *testing.T) {
svr := newServer(true, true, &meta{
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
@ -2610,8 +2602,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
})
defer svr.healthChecker.Close()
}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
@ -2620,7 +2612,11 @@ func Test_CheckHealth(t *testing.T) {
})
t.Run("ok", func(t *testing.T) {
svr := newServer(true, true, &meta{
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
@ -2638,8 +2634,7 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
})
defer svr.healthChecker.Close()
}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)

View File

@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -1588,24 +1588,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return checkResults
if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {

View File

@ -19,7 +19,6 @@ package session
import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
@ -32,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
@ -71,7 +69,7 @@ type DataNodeManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckDNHealth(ctx context.Context) *healthcheck.Result
CheckHealth(ctx context.Context) error
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
@ -509,44 +507,28 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ
return merr.CheckRPCCall(status, err)
}
func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()
func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)
ids := c.GetSessionIDs()
for _, nodeID := range ids {
nodeID := nodeID
wg.Add(1)
go func() {
defer wg.Done()
datanodeClient, err := c.getClient(ctx, nodeID)
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
if err != nil {
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
}
checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return err
}
if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})
}
wg.Wait()
return result
return group.Wait()
}
func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {

View File

@ -6,8 +6,6 @@ import (
context "context"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck"
mock "github.com/stretchr/testify/mock"
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -119,50 +117,48 @@ func (_c *MockDataNodeManager_CheckChannelOperationProgress_Call) RunAndReturn(r
return _c
}
// CheckDNHealth provides a mock function with given fields: ctx
func (_m *MockDataNodeManager) CheckDNHealth(ctx context.Context) *healthcheck.Result {
// CheckHealth provides a mock function with given fields: ctx
func (_m *MockDataNodeManager) CheckHealth(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for CheckDNHealth")
panic("no return value specified for CheckHealth")
}
var r0 *healthcheck.Result
if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok {
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*healthcheck.Result)
}
r0 = ret.Error(0)
}
return r0
}
// MockDataNodeManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth'
type MockDataNodeManager_CheckDNHealth_Call struct {
// MockDataNodeManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNodeManager_CheckHealth_Call struct {
*mock.Call
}
// CheckDNHealth is a helper method to define mock.On call
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockDataNodeManager_Expecter) CheckDNHealth(ctx interface{}) *MockDataNodeManager_CheckDNHealth_Call {
return &MockDataNodeManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)}
func (_e *MockDataNodeManager_Expecter) CheckHealth(ctx interface{}) *MockDataNodeManager_CheckHealth_Call {
return &MockDataNodeManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)}
}
func (_c *MockDataNodeManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckDNHealth_Call {
func (_c *MockDataNodeManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockDataNodeManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call {
func (_c *MockDataNodeManager_CheckHealth_Call) Return(_a0 error) *MockDataNodeManager_CheckHealth_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockDataNodeManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call {
func (_c *MockDataNodeManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockDataNodeManager_CheckHealth_Call {
_c.Call.Return(run)
return _c
}

View File

@ -285,8 +285,7 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 {
return binlogIDs
}
func CheckCheckPointsHealth(meta *meta) map[int64]string {
checkResult := make(map[int64]string)
func CheckCheckPointsHealth(meta *meta) error {
for channel, cp := range meta.GetChannelCheckpoints() {
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
if collectionID == -1 {
@ -300,30 +299,31 @@ func CheckCheckPointsHealth(meta *meta) map[int64]string {
ts, _ := tsoutil.ParseTS(cp.Timestamp)
lag := time.Since(ts)
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {
checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel)
return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes()))
}
}
return checkResult
return nil
}
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string {
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error {
collIDs := meta.ListCollections()
checkResult := make(map[int64]string)
for _, collID := range collIDs {
collInfo := meta.GetCollection(collID)
if collInfo == nil {
log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID))
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}
for _, channelName := range collInfo.VChannelNames {
_, err := channelManager.FindWatcher(channelName)
if err != nil {
checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName)
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
return err
}
}
}
return checkResult
return nil
}
func createStorageConfig() *indexpb.StorageConfig {

View File

@ -52,7 +52,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro
return nil, err
}
minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph()
minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt()
return &metricsinfo.DataNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{},
Rms: rms,

View File

@ -22,7 +22,6 @@ package datanode
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
@ -37,7 +36,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -47,7 +45,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -576,20 +573,3 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
return merr.Success(), nil
}
func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph()
if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil {
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
}
return healthcheck.OK(), nil
}

View File

@ -110,7 +110,6 @@ func (s *DataNodeServicesSuite) SetupTest() {
s.Require().NoError(err)
s.node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode"))
s.node.flowgraphManager = pipeline.NewFlowgraphManager()
paramtable.SetNodeID(1)
}
@ -1162,41 +1161,6 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
})
}
func (s *DataNodeServicesSuite) TestCheckHealth() {
s.Run("node not healthy", func() {
s.SetupTest()
s.node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.False(merr.Ok(resp.GetStatus()))
s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})
s.Run("exceeded timetick lag on pipeline", func() {
s.SetupTest()
fgm := pipeline.NewMockFlowgraphManager(s.T())
fgm.EXPECT().GetMinTTFlowGraph().Return("timetick-lag-ch", uint64(3600)).Once()
s.node.flowgraphManager = fgm
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.False(resp.GetIsHealthy())
s.NotEmpty(resp.Reasons)
})
s.Run("ok", func() {
s.SetupTest()
ctx := context.Background()
resp, err := s.node.CheckHealth(ctx, nil)
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.True(resp.GetIsHealthy())
s.Empty(resp.Reasons)
})
}
func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
s.Run("node not healthy", func() {
s.SetupTest()

View File

@ -281,9 +281,3 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact
return client.DropCompactionPlan(ctx, req)
})
}
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}

View File

@ -410,7 +410,3 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
return s.datanode.DropCompactionPlan(ctx, req)
}
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.datanode.CheckHealth(ctx, req)
}

View File

@ -185,10 +185,6 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC
return m.status, m.err
}
func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.err
}
// /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func Test_NewServer(t *testing.T) {
paramtable.Init()

View File

@ -360,9 +360,3 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques
return client.DeleteBatch(ctx, req)
})
}
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}

View File

@ -394,7 +394,3 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return s.querynode.DeleteBatch(ctx, req)
}
func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.querynode.CheckHealth(ctx, req)
}

View File

@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
@ -42,7 +43,6 @@ type FlowgraphManager interface {
GetFlowgraphCount() int
GetCollectionIDs() []int64
GetMinTTFlowGraph() (string, typeutil.Timestamp)
GetChannelsJSON() string
GetSegmentsJSON() string
Close()
@ -76,6 +76,7 @@ func (fm *fgManagerImpl) RemoveFlowgraph(channel string) {
fm.flowgraphs.Remove(channel)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
util.GetRateCollector().RemoveFlowGraphChannel(channel)
}
}
@ -119,22 +120,6 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
return collectionSet.Collect()
}
// GetMinTTFlowGraph returns the vchannel and minimal time tick of flow graphs.
func (fm *fgManagerImpl) GetMinTTFlowGraph() (string, typeutil.Timestamp) {
minTt := typeutil.MaxTimestamp
var channel string
fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool {
latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch)
if minTt > latestTimeTick {
minTt = latestTimeTick
channel = ch
}
return true
})
return channel, minTt
}
// GetChannelsJSON returns all channels in json format.
func (fm *fgManagerImpl) GetChannelsJSON() string {
var channels []*metricsinfo.Channel

View File

@ -309,61 +309,6 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s
return _c
}
// GetMinTTFlowGraph provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetMinTTFlowGraph() (string, uint64) {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for GetMinTTFlowGraph")
}
var r0 string
var r1 uint64
if rf, ok := ret.Get(0).(func() (string, uint64)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
if rf, ok := ret.Get(1).(func() uint64); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(uint64)
}
return r0, r1
}
// MockFlowgraphManager_GetMinTTFlowGraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinTTFlowGraph'
type MockFlowgraphManager_GetMinTTFlowGraph_Call struct {
*mock.Call
}
// GetMinTTFlowGraph is a helper method to define mock.On call
func (_e *MockFlowgraphManager_Expecter) GetMinTTFlowGraph() *MockFlowgraphManager_GetMinTTFlowGraph_Call {
return &MockFlowgraphManager_GetMinTTFlowGraph_Call{Call: _e.mock.On("GetMinTTFlowGraph")}
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Run(run func()) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Return(_a0 string, _a1 uint64) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) RunAndReturn(run func() (string, uint64)) *MockFlowgraphManager_GetMinTTFlowGraph_Call {
_c.Call.Return(run)
return _c
}
// GetSegmentsJSON provides a mock function with given fields:
func (_m *MockFlowgraphManager) GetSegmentsJSON() string {
ret := _m.Called()

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// rateCol is global RateCollector in DataNode.
@ -37,6 +38,7 @@ type RateCollector struct {
*ratelimitutil.RateCollector
flowGraphTtMu sync.Mutex
flowGraphTt map[string]typeutil.Timestamp
}
func initGlobalRateCollector() {
@ -73,5 +75,35 @@ func newRateCollector() (*RateCollector, error) {
}
return &RateCollector{
RateCollector: rc,
flowGraphTt: make(map[string]typeutil.Timestamp),
}, nil
}
// UpdateFlowGraphTt updates RateCollector's flow graph time tick.
func (r *RateCollector) UpdateFlowGraphTt(channel string, t typeutil.Timestamp) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
r.flowGraphTt[channel] = t
}
// RemoveFlowGraphChannel removes channel from flowGraphTt.
func (r *RateCollector) RemoveFlowGraphChannel(channel string) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
delete(r.flowGraphTt, channel)
}
// GetMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs.
func (r *RateCollector) GetMinFlowGraphTt() (string, typeutil.Timestamp) {
r.flowGraphTtMu.Lock()
defer r.flowGraphTtMu.Unlock()
minTt := typeutil.MaxTimestamp
var channel string
for c, t := range r.flowGraphTt {
if minTt > t {
minTt = t
channel = c
}
}
return channel, minTt
}

View File

@ -0,0 +1,42 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestRateCollector(t *testing.T) {
t.Run("test FlowGraphTt", func(t *testing.T) {
collector, err := newRateCollector()
assert.NoError(t, err)
c, minTt := collector.GetMinFlowGraphTt()
assert.Equal(t, "", c)
assert.Equal(t, typeutil.MaxTimestamp, minTt)
collector.UpdateFlowGraphTt("channel1", 100)
collector.UpdateFlowGraphTt("channel2", 200)
collector.UpdateFlowGraphTt("channel3", 50)
c, minTt = collector.GetMinFlowGraphTt()
assert.Equal(t, "channel3", c)
assert.Equal(t, typeutil.Timestamp(50), minTt)
})
}

View File

@ -91,61 +91,6 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func
return _c
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNode_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call {
return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// CompactionV2 provides a mock function with given fields: _a0, _a1
func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -105,76 +105,6 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru
return _c
}
// CheckHealth provides a mock function with given fields: ctx, in, opts
func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockDataNodeClient_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.CheckHealthRequest
// - opts ...grpc.CallOption
func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call {
return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
})
return _c
}
func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockDataNodeClient) Close() error {
ret := _m.Called()

View File

@ -30,61 +30,6 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter {
return &MockQueryNode_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNode_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call {
return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -31,76 +31,6 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter {
return &MockQueryNodeClient_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNodeClient_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - in *milvuspb.CheckHealthRequest
// - opts ...grpc.CallOption
func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call {
return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Close provides a mock function with given fields:
func (_m *MockQueryNodeClient) Close() error {
ret := _m.Called()

View File

@ -137,8 +137,6 @@ service DataNode {
rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {}
rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {}
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
}
message FlushRequest {

View File

@ -175,9 +175,7 @@ service QueryNode {
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
}
rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {}
}
}
// --------------------QueryCoord grpc request and response proto------------------

View File

@ -29,61 +29,6 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter {
return &MockQueryNodeServer_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(_a0, _a1)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockQueryNodeServer_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *milvuspb.CheckHealthRequest
func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call {
return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)}
}
func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest))
})
return _c
}
func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// Delete provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) {
ret := _m.Called(_a0, _a1)

View File

@ -55,7 +55,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
@ -139,8 +138,6 @@ type Server struct {
proxyClientManager proxyutil.ProxyClientManagerInterface
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
func NewQueryCoord(ctx context.Context) (*Server, error) {
@ -427,8 +424,6 @@ func (s *Server) initQueryCoord() error {
// Init load status cache
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return err
}
@ -572,7 +567,6 @@ func (s *Server) startQueryCoord() error {
s.startServerLoop()
s.afterStart()
s.healthChecker.Start()
s.UpdateStateCode(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID())
return nil
@ -611,9 +605,7 @@ func (s *Server) Stop() error {
// FOLLOW the dependence graph:
// job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session
// observers -> dist controller
if s.healthChecker != nil {
s.healthChecker.Close()
}
if s.jobScheduler != nil {
log.Info("stop job scheduler...")
s.jobScheduler.Stop()

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -36,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/job"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -914,20 +913,16 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil
}
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
checkResults := s.broadcastCheckHealth(ctx)
for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable))
errReasons, err := s.checkNodeHealth(ctx)
if err != nil || len(errReasons) != 0 {
return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil
}
return checkResults
if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil {
log.Ctx(ctx).Warn("some collection is not queryable during health check", zap.Error(err))
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
@ -958,39 +953,6 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) {
return errReasons, err
}
func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
for _, node := range s.nodeMgr.GetAll() {
node := node
wg.Add(1)
go func() {
defer wg.Done()
checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID())
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
}
if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
}
wg.Wait()
return result
}
func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(
zap.String("rgName", req.GetResourceGroup()),

View File

@ -47,7 +47,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
@ -171,13 +170,6 @@ func (suite *ServiceSuite) SetupTest() {
}))
suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node)
}
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil).Maybe()
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
@ -1635,9 +1627,6 @@ func (suite *ServiceSuite) TestCheckHealth() {
suite.loadAll()
ctx := context.Background()
server := suite.server
server.healthChecker = healthcheck.NewChecker(50*time.Millisecond, suite.server.healthCheckFn)
server.healthChecker.Start()
defer server.healthChecker.Close()
assertCheckHealthResult := func(isHealthy bool) {
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -1650,38 +1639,28 @@ func (suite *ServiceSuite) TestCheckHealth() {
}
}
setNodeSate := func(isHealthy bool, isRPCFail bool) {
var resp *milvuspb.CheckHealthResponse
if isHealthy {
resp = healthcheck.OK()
} else {
resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck))
}
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
if isRPCFail {
resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny}
}
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset()
suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe()
time.Sleep(1 * time.Second)
setNodeSate := func(state commonpb.StateCode) {
// Test for components state fail
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset()
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(
&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: state},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
},
nil).Maybe()
}
// Test for server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
assertCheckHealthResult(false)
// Test for check health has some error reasons
setNodeSate(false, false)
server.UpdateStateCode(commonpb.StateCode_Healthy)
assertCheckHealthResult(false)
// Test for check health rpc fail
setNodeSate(true, true)
// Test for components state fail
setNodeSate(commonpb.StateCode_Abnormal)
server.UpdateStateCode(commonpb.StateCode_Healthy)
assertCheckHealthResult(false)
// Test for check load percentage fail
setNodeSate(true, false)
setNodeSate(commonpb.StateCode_Healthy)
assertCheckHealthResult(true)
// Test for check channel ok
@ -1703,14 +1682,7 @@ func (suite *ServiceSuite) TestCheckHealth() {
for _, node := range suite.nodes {
suite.nodeMgr.Stopping(node)
}
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key)
time.Sleep(1500 * time.Millisecond)
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
suite.NoError(err)
suite.Equal(resp.IsHealthy, true)
suite.NotEmpty(resp.Reasons)
assertCheckHealthResult(true)
}
func (suite *ServiceSuite) TestGetShardLeaders() {

View File

@ -52,7 +52,6 @@ type Cluster interface {
GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error)
GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error)
CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error)
Start()
Stop()
}
@ -273,20 +272,6 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types
return nil
}
func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
var (
resp *milvuspb.CheckHealthResponse
err error
)
err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) {
resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
})
if err1 != nil {
return nil, err1
}
return resp, err
}
type clients struct {
sync.RWMutex
clients map[int64]types.QueryNodeClient // nodeID -> client

View File

@ -27,61 +27,6 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter {
return &MockCluster_Expecter{mock: &_m.Mock}
}
// CheckHealth provides a mock function with given fields: ctx, nodeID
func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) {
ret := _m.Called(ctx, nodeID)
var r0 *milvuspb.CheckHealthResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok {
return rf(ctx, nodeID)
}
if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok {
r0 = rf(ctx, nodeID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.CheckHealthResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
r1 = rf(ctx, nodeID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth'
type MockCluster_CheckHealth_Call struct {
*mock.Call
}
// CheckHealth is a helper method to define mock.On call
// - ctx context.Context
// - nodeID int64
func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call {
return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)}
}
func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64))
})
return _c
}
func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call {
_c.Call.Return(run)
return _c
}
// GetComponentStates provides a mock function with given fields: ctx, nodeID
func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) {
ret := _m.Called(ctx, nodeID)

View File

@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target
for segmentID, info := range segmentDist {
_, exist := leader.Segments[segmentID]
if !exist {
log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID
if l0WithWrongLocation {
log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID))
return merr.WrapErrSegmentLack(segmentID)
}
}
@ -113,6 +113,8 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me
) ([]*querypb.ShardLeadersList, error) {
ret := make([]*querypb.ShardLeadersList, 0)
for _, channel := range channels {
log := log.With(zap.String("channel", channel.GetChannelName()))
var channelErr error
leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName()))
if len(leaders) == 0 {
@ -130,7 +132,7 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me
if len(readableLeaders) == 0 {
msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName())
log.RatedWarn(60, msg, zap.Error(channelErr))
log.Warn(msg, zap.Error(channelErr))
err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error())
return nil, err
}
@ -183,9 +185,8 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetMan
}
// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string {
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
checkResult := make(map[int64]string)
func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
for _, coll := range m.GetAllCollections(ctx) {
err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll)
// the collection is not queryable, if meet following conditions:
@ -193,10 +194,15 @@ func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta
// 2. Collection is not starting to release
// 3. The load percentage has not been updated in the last 5 minutes.
if err != nil && m.Exist(ctx, coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval {
checkResult[coll.CollectionID] = err.Error()
log.Ctx(ctx).Warn("collection not querable",
zap.Int64("collectionID", coll.CollectionID),
zap.Time("lastUpdated", coll.UpdatedAt),
zap.Duration("maxInterval", maxInterval),
zap.Error(err))
return err
}
}
return checkResult
return nil
}
// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection

View File

@ -54,7 +54,13 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) {
return rms, nil
}
func getMinTSafe(node *QueryNode) (string, uint64) {
// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
rms, err := getRateMetric()
if err != nil {
return nil, err
}
minTsafeChannel := ""
minTsafe := uint64(math.MaxUint64)
node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool {
@ -65,17 +71,7 @@ func getMinTSafe(node *QueryNode) (string, uint64) {
}
return true
})
return minTsafeChannel, minTsafe
}
// getQuotaMetrics returns QueryNodeQuotaMetrics.
func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) {
rms, err := getRateMetric()
if err != nil {
return nil, err
}
minTsafeChannel, minTsafe := getMinTSafe(node)
collections := node.manager.Collection.ListWithName()
nodeID := fmt.Sprint(node.GetNodeID())

View File

@ -42,7 +42,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/searchutil/scheduler"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
@ -55,7 +54,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1386,25 +1384,6 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
return merr.Success(), nil
}
func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}
defer node.lifetime.Done()
maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minTsafeChannel, minTsafe := getMinTSafe(node)
if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil {
msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed)
return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil
}
return healthcheck.OK(), nil
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {

View File

@ -98,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false")
suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name())
suite.rootPath = suite.T().Name()
suite.collectionID = 111
suite.collectionName = "test-collection"
suite.partitionIDs = []int64{222}
@ -2222,44 +2222,6 @@ func (suite *ServiceSuite) TestLoadPartition() {
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
}
func (suite *ServiceSuite) TestCheckHealth() {
suite.Run("node not healthy", func() {
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.False(merr.Ok(resp.GetStatus()))
suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
})
suite.Run("exceeded timetick lag on pipeline", func() {
sd1 := delegator.NewMockShardDelegator(suite.T())
sd1.EXPECT().GetTSafe().Return(100)
sd1.EXPECT().Close().Maybe()
suite.node.delegators.Insert("timetick-lag-ch", sd1)
defer suite.node.delegators.GetAndRemove("timetick-lag-ch")
ctx := context.Background()
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.True(merr.Ok(resp.GetStatus()))
suite.False(resp.GetIsHealthy())
suite.NotEmpty(resp.Reasons)
})
suite.Run("ok", func() {
ctx := context.Background()
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := suite.node.CheckHealth(ctx, nil)
suite.NoError(err)
suite.True(merr.Ok(resp.GetStatus()))
suite.True(resp.GetIsHealthy())
suite.Empty(resp.Reasons)
})
}
func TestQueryNodeService(t *testing.T) {
suite.Run(t, new(ServiceSuite))
}

View File

@ -405,7 +405,6 @@ func newMockProxy() *mockProxy {
func newTestCore(opts ...Opt) *Core {
c := &Core{
ctx: context.TODO(),
metricsRequest: metricsinfo.NewMetricsRequest(),
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}},
}

View File

@ -32,6 +32,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -50,7 +51,6 @@ import (
tso2 "github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
@ -131,7 +131,6 @@ type Core struct {
activateFunc func() error
metricsRequest *metricsinfo.MetricsRequest
healthChecker *healthcheck.Checker
}
// --------------------- function --------------------------
@ -502,8 +501,6 @@ func (c *Core) initInternal() error {
return err
}
interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn)
log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address))
return nil
}
@ -798,7 +795,6 @@ func (c *Core) startInternal() error {
}()
c.startServerLoop()
c.healthChecker.Start()
c.UpdateStateCode(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID)
log.Info("rootcoord startup successfully")
@ -860,10 +856,6 @@ func (c *Core) revokeSession() {
// Stop stops rootCoord.
func (c *Core) Stop() error {
c.UpdateStateCode(commonpb.StateCode_Abnormal)
if c.healthChecker != nil {
c.healthChecker.Close()
}
c.stopExecutor()
c.stopScheduler()
if c.proxyWatcher != nil {
@ -3130,40 +3122,53 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest)
}, nil
}
latestCheckResult := c.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}
func (c *Core) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(c.ctx, timeout)
defer cancel()
group, ctx := errgroup.WithContext(ctx)
errs := typeutil.NewConcurrentSet[error]()
proxyClients := c.proxyClientManager.GetProxyClients()
wg := sync.WaitGroup{}
lock := sync.Mutex{}
result := healthcheck.NewResult()
proxyClients.Range(func(key int64, value types.ProxyClient) bool {
nodeID := key
proxyClient := value
wg.Add(1)
go func() {
defer wg.Done()
resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err)
lock.Lock()
defer lock.Unlock()
group.Go(func() error {
sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
errs.Insert(err)
return err
}
}()
err = merr.AnalyzeState("Proxy", nodeID, sta)
if err != nil {
errs.Insert(err)
}
return err
})
return true
})
wg.Wait()
return result
maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
if maxDelay > 0 {
group.Go(func() error {
err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay)
if err != nil {
errs.Insert(err)
}
return err
})
}
err := group.Wait()
if err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: false,
Reasons: lo.Map(errs.Collect(), func(e error, i int) string {
return err.Error()
}),
}, nil
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil
}
func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
@ -39,7 +40,6 @@ import (
mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks"
"github.com/milvus-io/milvus/internal/util/dependency"
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/util"
@ -1479,6 +1479,65 @@ func TestRootCoord_AlterCollection(t *testing.T) {
}
func TestRootCoord_CheckHealth(t *testing.T) {
getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.QueryClusterTopology{
ConnectedNodes: []metricsinfo.QueryNodeInfos{
{
QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0),
}, nil
}
getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.DataClusterTopology{
ConnectedDataNodes: []metricsinfo.DataNodeInfos{
{
QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{
Fgm: metricsinfo.FlowGraphMetric{
MinFlowGraphChannel: "ch1",
MinFlowGraphTt: tt,
NumFlowGraph: 1,
},
},
},
},
}
resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology})
return &milvuspb.GetMetricsResponse{
Status: merr.Success(),
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0),
}, nil
}
querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0)
datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0)
dcClient := mocks.NewMockDataCoordClient(t)
dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT))
qcClient := mocks.NewMockQueryCoordClient(t)
qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT))
errDataCoordClient := mocks.NewMockDataCoordClient(t)
errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
errQueryCoordClient := mocks.NewMockQueryCoordClient(t)
errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error"))
t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
c := newTestCore(withAbnormalCode())
@ -1488,13 +1547,25 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("proxy health check fail with invalid proxy", func(t *testing.T) {
c := newTestCore(withHealthyCode(), withInvalidProxyManager())
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
c.healthChecker.Start()
defer c.healthChecker.Close()
t.Run("ok with disabled tt lag configuration", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
time.Sleep(50 * time.Millisecond)
c := newTestCore(withHealthyCode(), withValidProxyManager())
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, true, resp.IsHealthy)
assert.Empty(t, resp.Reasons)
})
t.Run("proxy health check fail with invalid proxy", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
@ -1503,14 +1574,55 @@ func TestRootCoord_CheckHealth(t *testing.T) {
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok", func(t *testing.T) {
c := newTestCore(withHealthyCode(), withValidProxyManager())
c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn)
c.healthChecker.Start()
defer c.healthChecker.Close()
t.Run("proxy health check fail with get metrics error", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
time.Sleep(50 * time.Millisecond)
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
{
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
}
})
t.Run("ok with tt lag exceeded", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
assert.Equal(t, false, resp.IsHealthy)
assert.NotEmpty(t, resp.Reasons)
})
t.Run("ok with tt lag checking", func(t *testing.T) {
v := Params.QuotaConfig.MaxTimeTickDelay.GetValue()
Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600")
defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v)
c := newTestCore(withHealthyCode(),
withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient))
ctx := context.Background()
resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)

View File

@ -20,8 +20,10 @@ import (
"context"
"fmt"
"strconv"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/json"
@ -32,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -281,3 +284,97 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn
return ret, nil
}
func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout)
defer cancel()
now := time.Now()
group := &errgroup.Group{}
queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]()
group.Go(func() error {
queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord)
if err != nil {
return err
}
for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes {
qm := queryNodeMetric.QuotaMetrics
if qm != nil {
if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay)
}
}
}
}
return nil
})
// get Data cluster metrics
group.Go(func() error {
dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord)
if err != nil {
return err
}
for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes {
dm := dataNodeMetric.QuotaMetrics
if dm != nil {
if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" {
minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt)
delay := now.Sub(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay)
}
}
}
}
return nil
})
err := group.Wait()
if err != nil {
return err
}
var maxLagChannel string
var maxLag time.Duration
findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) {
for _, param := range params {
param.Range(func(k string, v time.Duration) bool {
if v > maxLag {
maxLag = v
maxLagChannel = k
}
return true
})
}
}
var errStr string
findMaxLagChannel(queryNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
maxLagChannel = ""
maxLag = 0
findMaxLagChannel(dataNodeTTDelay)
if maxLag > 0 && len(maxLagChannel) != 0 {
if errStr != "" {
errStr += ", "
}
errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel)
}
if errStr != "" {
return fmt.Errorf("max timetick lag execced threhold: %s", errStr)
}
return nil
}

View File

@ -84,3 +84,17 @@ func WaitForComponentHealthy[T interface {
}](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error {
return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep)
}
func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse {
if err != nil {
return CheckHealthRespWithErrMsg(err.Error())
}
return CheckHealthRespWithErrMsg()
}
func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse {
if len(errMsg) != 0 {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg}
}
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
}

View File

@ -1,276 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package healthcheck
import (
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
// UnHealthyLevel represents the health level of a system.
type UnHealthyLevel int
const (
// Healthy means the system is operating normally.
Healthy UnHealthyLevel = iota
// Warning indicates minor issues that might escalate.
Warning
// Critical indicates major issues that need immediate attention.
Critical
// Fatal indicates system failure.
Fatal
)
// String returns the string representation of the UnHealthyLevel.
func (u UnHealthyLevel) String() string {
switch u {
case Healthy:
return "Healthy"
case Warning:
return "Warning"
case Critical:
return "Critical"
case Fatal:
return "Fatal"
default:
return "Unknown"
}
}
type Item int
const (
ChannelsWatched Item = iota
CheckpointLagExceed
CollectionQueryable
TimeTickLagExceed
NodeHealthCheck
)
func getUnhealthyLevel(item Item) UnHealthyLevel {
switch item {
case ChannelsWatched:
return Fatal
case CheckpointLagExceed:
return Fatal
case TimeTickLagExceed:
return Fatal
case NodeHealthCheck:
return Fatal
case CollectionQueryable:
return Critical
default:
panic(fmt.Sprintf("unknown health check item: %d", int(item)))
}
}
type Result struct {
UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"`
UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"`
}
func NewResult() *Result {
return &Result{}
}
func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) {
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm)
}
func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) {
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm)
}
func (r *Result) AppendResult(other *Result) {
if other == nil {
return
}
r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...)
r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...)
}
func (r *Result) IsEmpty() bool {
return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0
}
func (r *Result) IsHealthy() bool {
if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 {
return true
}
for _, unm := range r.UnhealthyClusterMsgs {
if unm.Reason.UnhealthyLevel == Fatal {
return false
}
}
for _, ucm := range r.UnhealthyCollectionMsgs {
if ucm.Reason.UnhealthyLevel == Fatal {
return false
}
}
return true
}
type UnhealthyReason struct {
UnhealthyMsg string `json:"unhealthy_msg"`
UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"`
}
type UnhealthyClusterMsg struct {
Role string `json:"role"`
NodeID int64 `json:"node_id"`
Reason *UnhealthyReason `json:"reason"`
}
func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg {
return &UnhealthyClusterMsg{
Role: role,
NodeID: nodeID,
Reason: &UnhealthyReason{
UnhealthyMsg: unhealthyMsg,
UnhealthyLevel: getUnhealthyLevel(item),
},
}
}
type UnhealthyCollectionMsg struct {
DatabaseID int64 `json:"database_id"`
CollectionID int64 `json:"collection_id"`
Reason *UnhealthyReason `json:"reason"`
}
func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg {
return &UnhealthyCollectionMsg{
CollectionID: collectionID,
Reason: &UnhealthyReason{
UnhealthyMsg: unhealthyMsg,
UnhealthyLevel: getUnhealthyLevel(item),
},
}
}
type Checker struct {
sync.RWMutex
interval time.Duration
done chan struct{}
checkFn func() *Result
latestResult *Result
once sync.Once
}
func NewChecker(interval time.Duration, checkFn func() *Result) *Checker {
checker := &Checker{
interval: interval,
checkFn: checkFn,
latestResult: NewResult(),
done: make(chan struct{}, 1),
once: sync.Once{},
}
return checker
}
func (hc *Checker) Start() {
go func() {
ticker := time.NewTicker(hc.interval)
defer ticker.Stop()
log.Info("start health checker")
for {
select {
case <-ticker.C:
hc.Lock()
hc.latestResult = hc.checkFn()
hc.Unlock()
case <-hc.done:
log.Info("stop health checker")
return
}
}
}()
}
func (hc *Checker) GetLatestCheckResult() *Result {
hc.RLock()
defer hc.RUnlock()
return hc.latestResult
}
func (hc *Checker) Close() {
hc.once.Do(func() {
close(hc.done)
})
}
func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result {
var r Result
if len(resp.Reasons) == 0 {
return &r
}
if len(resp.Reasons) > 1 {
log.Error("invalid check result", zap.Any("reasons", resp.Reasons))
return &r
}
err := json.Unmarshal([]byte(resp.Reasons[0]), &r)
if err != nil {
log.Error("unmarshal check result error", zap.String("error", err.Error()))
}
return &r
}
func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse {
r := &Result{UnhealthyClusterMsgs: msg}
reasons, err := json.Marshal(r)
if err != nil {
log.Error("marshal check result error", zap.String("error", err.Error()))
}
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: r.IsHealthy(),
Reasons: []string{string(reasons)},
}
}
func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse {
if checkResult.IsEmpty() {
return OK()
}
reason, err := json.Marshal(checkResult)
if err != nil {
log.Error("marshal check result error", zap.String("error", err.Error()))
}
return &milvuspb.CheckHealthResponse{
Status: merr.Success(),
IsHealthy: checkResult.IsHealthy(),
Reasons: []string{string(reason)},
}
}
func OK() *milvuspb.CheckHealthResponse {
return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}
}

View File

@ -1,60 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package healthcheck
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func TestChecker(t *testing.T) {
expected1 := NewResult()
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched))
expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{
CollectionID: 1,
Reason: &UnhealthyReason{
UnhealthyMsg: "msg2",
UnhealthyLevel: Critical,
},
})
checkFn := func() *Result {
return expected1
}
checker := NewChecker(100*time.Millisecond, checkFn)
go checker.Start()
time.Sleep(150 * time.Millisecond)
actual1 := checker.GetLatestCheckResult()
assert.Equal(t, expected1, actual1)
assert.False(t, actual1.IsHealthy())
chr := GetCheckHealthResponseFromResult(actual1)
assert.Equal(t, merr.Success(), chr.Status)
assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy)
assert.Equal(t, 1, len(chr.Reasons))
actualResult := GetHealthCheckResultFromResp(chr)
assert.Equal(t, actual1, actualResult)
checker.Close()
}

View File

@ -112,7 +112,3 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo
func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return &commonpb.Status{}, m.Err
}
func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}

View File

@ -134,10 +134,6 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet
return &querypb.DeleteBatchResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) Close() error {
return m.Err
}

View File

@ -152,10 +152,6 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa
return qn.QueryNode.DeleteBatch(ctx, in)
}
func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return qn.QueryNode.CheckHealth(ctx, req)
}
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
return &qnServerWrapper{
QueryNode: qn,

View File

@ -24,6 +24,7 @@ import (
"fmt"
"net"
"reflect"
"regexp"
"strconv"
"strings"
"time"
@ -290,18 +291,13 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri
}
func GetCollectionIDFromVChannel(vChannelName string) int64 {
end := strings.LastIndexByte(vChannelName, 'v')
if end <= 0 {
return -1
}
start := strings.LastIndexByte(vChannelName, '_')
if start <= 0 {
return -1
}
collectionIDStr := vChannelName[start+1 : end]
if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil {
return collectionID
re := regexp.MustCompile(`.*_(\d+)v\d+`)
matches := re.FindStringSubmatch(vChannelName)
if len(matches) > 1 {
number, err := strconv.ParseInt(matches[1], 0, 64)
if err == nil {
return number
}
}
return -1
}

View File

@ -299,13 +299,6 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error {
return CheckHealthy(stateCode)
}
func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error {
if err != nil {
return errors.Wrap(err, "service is unhealthy")
}
return AnalyzeState(role, nodeID, resp)
}
func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error {
if err := Error(state.GetStatus()); err != nil {
return errors.Wrapf(err, "%s=%d not healthy", role, nodeID)

View File

@ -288,9 +288,6 @@ type commonConfig struct {
// Local RPC enabled for milvus internal communication when mix or standalone mode.
LocalRPCEnabled ParamItem `refreshable:"false"`
HealthCheckInterval ParamItem `refreshable:"true"`
HealthCheckRPCTimeout ParamItem `refreshable:"true"`
SyncTaskPoolReleaseTimeoutSeconds ParamItem `refreshable:"true"`
}
@ -952,22 +949,6 @@ This helps Milvus-CDC synchronize incremental data`,
}
p.LocalRPCEnabled.Init(base.mgr)
p.HealthCheckInterval = ParamItem{
Key: "common.healthcheck.interval.seconds",
Version: "2.4.8",
DefaultValue: "30",
Doc: `health check interval in seconds, default 30s`,
}
p.HealthCheckInterval.Init(base.mgr)
p.HealthCheckRPCTimeout = ParamItem{
Key: "common.healthcheck.timeout.seconds",
Version: "2.4.8",
DefaultValue: "10",
Doc: `RPC timeout for health check request`,
}
p.HealthCheckRPCTimeout.Init(base.mgr)
p.SyncTaskPoolReleaseTimeoutSeconds = ParamItem{
Key: "common.sync.taskPoolReleaseTimeoutSeconds",
DefaultValue: "60",
@ -2280,9 +2261,9 @@ If this parameter is set false, Milvus simply searches the growing segments with
p.UpdateCollectionLoadStatusInterval = ParamItem{
Key: "queryCoord.updateCollectionLoadStatusInterval",
Version: "2.4.7",
DefaultValue: "300",
DefaultValue: "5",
PanicIfEmpty: true,
Doc: "300s, max interval of updating collection loaded status for check health",
Doc: "5m, max interval of updating collection loaded status for check health",
Export: true,
}

View File

@ -131,11 +131,6 @@ func TestComponentParam(t *testing.T) {
params.Save("common.gchelper.minimumGoGC", "80")
assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt())
params.Save("common.healthcheck.interval.seconds", "60")
assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second))
params.Save("common.healthcheck.timeout.seconds", "5")
assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt())
assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings()))
assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings()))
@ -335,8 +330,8 @@ func TestComponentParam(t *testing.T) {
checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt()
assert.Equal(t, 2000, checkHealthRPCTimeout)
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second)
assert.Equal(t, time.Second*300, updateInterval)
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
assert.Equal(t, updateInterval, time.Minute*5)
assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat())
params.Save("queryCoord.globalRowCountFactor", "0.4")

View File

@ -16,13 +16,7 @@
package ratelimitutil
import (
"fmt"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
var QuotaErrorString = map[commonpb.ErrorCode]string{
commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator",
@ -34,14 +28,3 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{
func GetQuotaErrorString(errCode commonpb.ErrorCode) string {
return QuotaErrorString[errCode]
}
func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error {
if channel != "" && maxDelay > 0 {
minTt, _ := tsoutil.ParseTS(minTT)
delay := time.Since(minTt)
if delay.Milliseconds() >= maxDelay.Milliseconds() {
return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel)
}
}
return nil
}