mirror of https://github.com/milvus-io/milvus.git
fix: [10kcp] Fix slow preprocess in qc scheduler (#38784)
supplement to pr: https://github.com/milvus-io/milvus/pull/38566 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/38794/head
parent
7f5467577e
commit
05f50b11ff
|
@ -390,7 +390,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
|
|||
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60).
|
||||
With(zap.Int64("collectionID", collectionID))
|
||||
|
||||
log.RatedInfo(10, "observer trigger update next target")
|
||||
log.Info("observer trigger update next target")
|
||||
err := ob.targetMgr.UpdateCollectionNextTarget(collectionID)
|
||||
if err != nil {
|
||||
log.Warn("failed to update next target for collection",
|
||||
|
@ -422,7 +422,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
|
|||
channelNames := ob.targetMgr.GetDmChannelsByCollection(collectionID, meta.NextTarget)
|
||||
if len(channelNames) == 0 {
|
||||
// next target is empty, no need to update
|
||||
log.RatedInfo(10, "next target is empty, no need to update")
|
||||
log.Info("next target is empty, no need to update")
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -434,7 +434,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
|
|||
|
||||
// to avoid stuck here in dynamic increase replica case, we just check available delegator number
|
||||
if int32(len(channelReadyLeaders)) < replicaNum {
|
||||
log.RatedInfo(10, "channel not ready",
|
||||
log.Info("channel not ready",
|
||||
zap.Int("readyReplicaNum", len(channelReadyLeaders)),
|
||||
zap.String("channelName", channel),
|
||||
)
|
||||
|
@ -573,7 +573,7 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead
|
|||
|
||||
func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("qcv2.TargetObserver", 1, 60)
|
||||
log.RatedInfo(10, "observer trigger update current target", zap.Int64("collectionID", collectionID))
|
||||
log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID))
|
||||
if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) {
|
||||
ob.mut.Lock()
|
||||
defer ob.mut.Unlock()
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
|
@ -116,35 +115,7 @@ func (action *SegmentAction) Scope() querypb.DataScope {
|
|||
}
|
||||
|
||||
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
|
||||
if action.Type() == ActionTypeGrow {
|
||||
// rpc finished
|
||||
return action.rpcReturned.Load()
|
||||
} else if action.Type() == ActionTypeReduce {
|
||||
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
|
||||
// loading segment replaces the node ID with the new one,
|
||||
// which confuses the condition of finishing,
|
||||
// the leader should return a map of segment ID to list of nodes,
|
||||
// now, we just always commit the release task to executor once.
|
||||
// NOTE: DO NOT create a task containing release action and the action is not the last action
|
||||
sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()))
|
||||
views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node()))
|
||||
growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 {
|
||||
return lo.Keys(view.GrowingSegments)
|
||||
})
|
||||
segments := make([]int64, 0, len(sealed)+len(growing))
|
||||
for _, segment := range sealed {
|
||||
segments = append(segments, segment.GetID())
|
||||
}
|
||||
segments = append(segments, growing...)
|
||||
if !funcutil.SliceContain(segments, action.SegmentID()) {
|
||||
return true
|
||||
}
|
||||
return action.rpcReturned.Load()
|
||||
} else if action.Type() == ActionTypeUpdate {
|
||||
return action.rpcReturned.Load()
|
||||
}
|
||||
|
||||
return true
|
||||
return action.rpcReturned.Load()
|
||||
}
|
||||
|
||||
func (action *SegmentAction) String() string {
|
||||
|
|
|
@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
|
|||
func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
|
||||
log := log.Ctx(context.TODO()).
|
||||
WithRateGroup("utils.CheckLeaderAvailable", 1, 60).
|
||||
With(zap.Int64("leaderID", leader.ID))
|
||||
With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID))
|
||||
info := nodeMgr.Get(leader.ID)
|
||||
|
||||
// Check whether leader is online
|
||||
|
|
Loading…
Reference in New Issue