mirror of https://github.com/milvus-io/milvus.git
fix: check collection health(queryable) fail for releasing collection (#34947)
issue: #34946 Signed-off-by: jaime <yun.zhang@zilliz.com>pull/35205/head
parent
3641ae6611
commit
fcec4c21b9
|
@ -367,6 +367,7 @@ queryCoord:
|
|||
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
|
||||
collectionObserverInterval: 200 # the interval of collection observer
|
||||
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
|
||||
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
|
||||
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
|
||||
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
|
||||
port: 19531 # TCP port of queryCoord
|
||||
|
|
|
@ -41,7 +41,7 @@ func NewChannelLevelScoreBalancer(scheduler task.Scheduler,
|
|||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *ChannelLevelScoreBalancer {
|
||||
return &ChannelLevelScoreBalancer{
|
||||
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
|
|
|
@ -452,7 +452,7 @@ func (g *randomPlanGenerator) generatePlans() []SegmentAssignPlan {
|
|||
type MultiTargetBalancer struct {
|
||||
*ScoreBasedBalancer
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
}
|
||||
|
||||
func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) {
|
||||
|
@ -548,7 +548,7 @@ func (b *MultiTargetBalancer) genPlanByDistributions(nodeSegments, globalNodeSeg
|
|||
return plans
|
||||
}
|
||||
|
||||
func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr *meta.TargetManager) *MultiTargetBalancer {
|
||||
func NewMultiTargetBalancer(scheduler task.Scheduler, nodeManager *session.NodeManager, dist *meta.DistributionManager, meta *meta.Meta, targetMgr meta.TargetManagerInterface) *MultiTargetBalancer {
|
||||
return &MultiTargetBalancer{
|
||||
ScoreBasedBalancer: NewScoreBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
dist: dist,
|
||||
|
|
|
@ -36,7 +36,7 @@ type RowCountBasedBalancer struct {
|
|||
*RoundRobinBalancer
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
}
|
||||
|
||||
// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
|
||||
|
@ -354,7 +354,7 @@ func NewRowCountBasedBalancer(
|
|||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *RowCountBasedBalancer {
|
||||
return &RowCountBasedBalancer{
|
||||
RoundRobinBalancer: NewRoundRobinBalancer(scheduler, nodeManager),
|
||||
|
|
|
@ -41,7 +41,7 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
|
|||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
) *ScoreBasedBalancer {
|
||||
return &ScoreBasedBalancer{
|
||||
RowCountBasedBalancer: NewRowCountBasedBalancer(scheduler, nodeManager, dist, meta, targetMgr),
|
||||
|
|
|
@ -41,7 +41,7 @@ type ControllerImpl struct {
|
|||
client session.Cluster
|
||||
nodeManager *session.NodeManager
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
scheduler task.Scheduler
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ func NewDistController(
|
|||
client session.Cluster,
|
||||
nodeManager *session.NodeManager,
|
||||
dist *meta.DistributionManager,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
scheduler task.Scheduler,
|
||||
) *ControllerImpl {
|
||||
return &ControllerImpl{
|
||||
|
|
|
@ -48,7 +48,7 @@ type LoadCollectionJob struct {
|
|||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
collectionObserver *observers.CollectionObserver
|
||||
nodeMgr *session.NodeManager
|
||||
|
@ -61,7 +61,7 @@ func NewLoadCollectionJob(
|
|||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
collectionObserver *observers.CollectionObserver,
|
||||
nodeMgr *session.NodeManager,
|
||||
|
@ -239,7 +239,7 @@ type LoadPartitionJob struct {
|
|||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
collectionObserver *observers.CollectionObserver
|
||||
nodeMgr *session.NodeManager
|
||||
|
@ -252,7 +252,7 @@ func NewLoadPartitionJob(
|
|||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
collectionObserver *observers.CollectionObserver,
|
||||
nodeMgr *session.NodeManager,
|
||||
|
|
|
@ -39,7 +39,7 @@ type ReleaseCollectionJob struct {
|
|||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ func NewReleaseCollectionJob(ctx context.Context,
|
|||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
checkerController *checkers.CheckerController,
|
||||
) *ReleaseCollectionJob {
|
||||
|
@ -114,7 +114,7 @@ type ReleasePartitionJob struct {
|
|||
meta *meta.Meta
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ func NewReleasePartitionJob(ctx context.Context,
|
|||
meta *meta.Meta,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *observers.TargetObserver,
|
||||
checkerController *checkers.CheckerController,
|
||||
) *ReleasePartitionJob {
|
||||
|
|
|
@ -38,12 +38,12 @@ type UndoList struct {
|
|||
ctx context.Context
|
||||
meta *meta.Meta
|
||||
cluster session.Cluster
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *observers.TargetObserver
|
||||
}
|
||||
|
||||
func NewUndoList(ctx context.Context, meta *meta.Meta,
|
||||
cluster session.Cluster, targetMgr *meta.TargetManager, targetObserver *observers.TargetObserver,
|
||||
cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver,
|
||||
) *UndoList {
|
||||
return &UndoList{
|
||||
ctx: ctx,
|
||||
|
|
|
@ -24,6 +24,49 @@ func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter {
|
|||
return &MockTargetManager_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CanSegmentBeMoved provides a mock function with given fields: collectionID, segmentID
|
||||
func (_m *MockTargetManager) CanSegmentBeMoved(collectionID int64, segmentID int64) bool {
|
||||
ret := _m.Called(collectionID, segmentID)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
|
||||
r0 = rf(collectionID, segmentID)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockTargetManager_CanSegmentBeMoved_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CanSegmentBeMoved'
|
||||
type MockTargetManager_CanSegmentBeMoved_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CanSegmentBeMoved is a helper method to define mock.On call
|
||||
// - collectionID int64
|
||||
// - segmentID int64
|
||||
func (_e *MockTargetManager_Expecter) CanSegmentBeMoved(collectionID interface{}, segmentID interface{}) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
return &MockTargetManager_CanSegmentBeMoved_Call{Call: _e.mock.On("CanSegmentBeMoved", collectionID, segmentID)}
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) Run(run func(collectionID int64, segmentID int64)) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) Return(_a0 bool) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope
|
||||
func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 {
|
||||
ret := _m.Called(collectionID, scope)
|
||||
|
|
|
@ -71,6 +71,7 @@ type TargetManagerInterface interface {
|
|||
IsNextTargetExist(collectionID int64) bool
|
||||
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
|
||||
Recover(catalog metastore.QueryCoordCatalog) error
|
||||
CanSegmentBeMoved(collectionID, segmentID int64) bool
|
||||
}
|
||||
|
||||
type TargetManager struct {
|
||||
|
|
|
@ -43,7 +43,7 @@ type CollectionObserver struct {
|
|||
|
||||
dist *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
targetObserver *TargetObserver
|
||||
checkerController *checkers.CheckerController
|
||||
partitionLoadedCount map[int64]int
|
||||
|
@ -62,7 +62,7 @@ type LoadTask struct {
|
|||
func NewCollectionObserver(
|
||||
dist *meta.DistributionManager,
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
targetObserver *TargetObserver,
|
||||
checherController *checkers.CheckerController,
|
||||
) *CollectionObserver {
|
||||
|
|
|
@ -55,7 +55,7 @@ type TargetObserver struct {
|
|||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
distMgr *meta.DistributionManager
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
|
@ -76,7 +76,7 @@ type TargetObserver struct {
|
|||
|
||||
func NewTargetObserver(
|
||||
meta *meta.Meta,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
distMgr *meta.DistributionManager,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
|
|
|
@ -90,7 +90,7 @@ type Server struct {
|
|||
store metastore.QueryCoordCatalog
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
broker meta.Broker
|
||||
|
||||
// Session
|
||||
|
|
|
@ -1610,44 +1610,65 @@ func (suite *ServiceSuite) TestGetReplicasWhenNoAvailableNodes() {
|
|||
}
|
||||
|
||||
func (suite *ServiceSuite) TestCheckHealth() {
|
||||
suite.loadAll()
|
||||
ctx := context.Background()
|
||||
server := suite.server
|
||||
|
||||
assertCheckHealthResult := func(isHealthy bool) {
|
||||
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, isHealthy)
|
||||
if !isHealthy {
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
} else {
|
||||
suite.Empty(resp.Reasons)
|
||||
}
|
||||
}
|
||||
|
||||
setNodeSate := func(state commonpb.StateCode) {
|
||||
// Test for components state fail
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset()
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(
|
||||
&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{StateCode: state},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
},
|
||||
nil).Maybe()
|
||||
}
|
||||
|
||||
// Test for server is not healthy
|
||||
server.UpdateStateCode(commonpb.StateCode_Initializing)
|
||||
resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, false)
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
assertCheckHealthResult(false)
|
||||
|
||||
// Test for components state fail
|
||||
for _, node := range suite.nodes {
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
|
||||
&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Abnormal},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
},
|
||||
nil).Once()
|
||||
}
|
||||
setNodeSate(commonpb.StateCode_Abnormal)
|
||||
server.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, false)
|
||||
suite.NotEmpty(resp.Reasons)
|
||||
assertCheckHealthResult(false)
|
||||
|
||||
// Test for server is healthy
|
||||
for _, node := range suite.nodes {
|
||||
suite.cluster.EXPECT().GetComponentStates(mock.Anything, node).Return(
|
||||
&milvuspb.ComponentStates{
|
||||
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
|
||||
},
|
||||
nil).Once()
|
||||
// Test for check load percentage fail
|
||||
setNodeSate(commonpb.StateCode_Healthy)
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for check channel ok
|
||||
for _, collection := range suite.collections {
|
||||
suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded)
|
||||
suite.updateChannelDist(collection)
|
||||
}
|
||||
resp, err = server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
|
||||
suite.NoError(err)
|
||||
suite.Equal(resp.IsHealthy, true)
|
||||
suite.Empty(resp.Reasons)
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for check channel fail
|
||||
tm := meta.NewMockTargetManager(suite.T())
|
||||
tm.EXPECT().GetDmChannelsByCollection(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||
otm := server.targetMgr
|
||||
server.targetMgr = tm
|
||||
assertCheckHealthResult(true)
|
||||
|
||||
// Test for get shard leader fail
|
||||
server.targetMgr = otm
|
||||
for _, node := range suite.nodes {
|
||||
suite.nodeMgr.Suspend(node)
|
||||
}
|
||||
assertCheckHealthResult(true)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestGetShardLeaders() {
|
||||
|
|
|
@ -57,7 +57,7 @@ type Executor struct {
|
|||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
broker meta.Broker
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
|
||||
|
@ -69,7 +69,7 @@ type Executor struct {
|
|||
func NewExecutor(meta *meta.Meta,
|
||||
dist *meta.DistributionManager,
|
||||
broker meta.Broker,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
cluster session.Cluster,
|
||||
nodeMgr *session.NodeManager,
|
||||
) *Executor {
|
||||
|
|
|
@ -157,7 +157,7 @@ type taskScheduler struct {
|
|||
|
||||
distMgr *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
targetMgr meta.TargetManagerInterface
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
|
@ -177,7 +177,7 @@ type taskScheduler struct {
|
|||
func NewScheduler(ctx context.Context,
|
||||
meta *meta.Meta,
|
||||
distMgr *meta.DistributionManager,
|
||||
targetMgr *meta.TargetManager,
|
||||
targetMgr meta.TargetManagerInterface,
|
||||
broker meta.Broker,
|
||||
cluster session.Cluster,
|
||||
nodeMgr *session.NodeManager,
|
||||
|
|
|
@ -19,6 +19,7 @@ package utils
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
|
@ -29,6 +30,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
|
||||
|
@ -104,7 +106,7 @@ func checkLoadStatus(m *meta.Meta, collectionID int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager,
|
||||
func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager,
|
||||
nodeMgr *session.NodeManager, collectionID int64, channels map[string]*meta.DmChannel,
|
||||
) ([]*querypb.ShardLeadersList, error) {
|
||||
ret := make([]*querypb.ShardLeadersList, 0)
|
||||
|
@ -163,7 +165,7 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr *meta.TargetManager, di
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
|
||||
func GetShardLeaders(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, collectionID int64) ([]*querypb.ShardLeadersList, error) {
|
||||
if err := checkLoadStatus(m, collectionID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -179,33 +181,48 @@ func GetShardLeaders(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.Dis
|
|||
}
|
||||
|
||||
// CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection
|
||||
func CheckCollectionsQueryable(m *meta.Meta, targetMgr *meta.TargetManager, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
|
||||
func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error {
|
||||
maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
|
||||
for _, coll := range m.GetAllCollections() {
|
||||
collectionID := coll.GetCollectionID()
|
||||
if err := checkLoadStatus(m, collectionID); err != nil {
|
||||
err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll)
|
||||
// the collection is not queryable, if meet following conditions:
|
||||
// 1. Some segments are not loaded
|
||||
// 2. Collection is not starting to release
|
||||
// 3. The load percentage has not been updated in the last 5 minutes.
|
||||
if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
if len(channels) == 0 {
|
||||
msg := "loaded collection do not found any channel in target, may be in recovery"
|
||||
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
|
||||
log.Warn("failed to get channels", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(channels) != len(shardList) {
|
||||
return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkCollectionQueryable check all channels are watched and all segments are loaded for this collection
|
||||
func checkCollectionQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager, coll *meta.Collection) error {
|
||||
collectionID := coll.GetCollectionID()
|
||||
if err := checkLoadStatus(m, collectionID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels := targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)
|
||||
if len(channels) == 0 {
|
||||
msg := "loaded collection do not found any channel in target, may be in recovery"
|
||||
err := merr.WrapErrCollectionOnRecovering(collectionID, msg)
|
||||
log.Warn("failed to get channels", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
shardList, err := GetShardLeadersWithChannels(m, targetMgr, dist, nodeMgr, collectionID, channels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(channels) != len(shardList) {
|
||||
return merr.WrapErrCollectionNotFullyLoaded(collectionID, "still have unwatched channels or loaded segments")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func filterDupLeaders(replicaManager *meta.ReplicaManager, leaders map[int64]*meta.LeaderView) map[int64]*meta.LeaderView {
|
||||
type leaderID struct {
|
||||
ReplicaID int64
|
||||
|
|
|
@ -1696,9 +1696,10 @@ type queryCoordConfig struct {
|
|||
EnableStoppingBalance ParamItem `refreshable:"true"`
|
||||
ChannelExclusiveNodeFactor ParamItem `refreshable:"true"`
|
||||
|
||||
CollectionObserverInterval ParamItem `refreshable:"false"`
|
||||
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
|
||||
CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"`
|
||||
CollectionObserverInterval ParamItem `refreshable:"false"`
|
||||
CheckExecutedFlagInterval ParamItem `refreshable:"false"`
|
||||
CollectionBalanceSegmentBatchSize ParamItem `refreshable:"true"`
|
||||
UpdateCollectionLoadStatusInterval ParamItem `refreshable:"false"`
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
|
@ -2093,6 +2094,17 @@ If this parameter is set false, Milvus simply searches the growing segments with
|
|||
}
|
||||
p.CheckHealthInterval.Init(base.mgr)
|
||||
|
||||
p.UpdateCollectionLoadStatusInterval = ParamItem{
|
||||
Key: "queryCoord.updateCollectionLoadStatusInterval",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "5",
|
||||
PanicIfEmpty: true,
|
||||
Doc: "5m, max interval of updating collection loaded status for check health",
|
||||
Export: true,
|
||||
}
|
||||
|
||||
p.UpdateCollectionLoadStatusInterval.Init(base.mgr)
|
||||
|
||||
p.CheckHealthRPCTimeout = ParamItem{
|
||||
Key: "queryCoord.checkHealthRPCTimeout",
|
||||
Version: "2.2.7",
|
||||
|
|
|
@ -302,6 +302,9 @@ func TestComponentParam(t *testing.T) {
|
|||
checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt()
|
||||
assert.Equal(t, 2000, checkHealthRPCTimeout)
|
||||
|
||||
updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute)
|
||||
assert.Equal(t, updateInterval, time.Minute*5)
|
||||
|
||||
assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat())
|
||||
params.Save("queryCoord.globalRowCountFactor", "0.4")
|
||||
assert.Equal(t, 0.4, Params.GlobalRowCountFactor.GetAsFloat())
|
||||
|
|
Loading…
Reference in New Issue