mirror of https://github.com/milvus-io/milvus.git
fix: Check stale should check leader task's leader (#31995)
issue: #30816 pr: #31962 check stale rules for leader task: for reduce leader task, it should keep executing until leader's node become offline. for grow leader task,it should keep executing until leader's node become stopping. This PR check leader node's stopping state for grow leader task Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32073/head
parent
4d7ae52d62
commit
8475a63d72
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -122,12 +121,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
|
|||
)
|
||||
ret := make([]task.Task, 0)
|
||||
|
||||
// skip set segment on stopping node to leader view
|
||||
aliveNodeDist := lo.Filter(dist, func(s *meta.Segment, _ int) bool {
|
||||
nodeInfo := c.nodeMgr.Get(s.Node)
|
||||
return nodeInfo != nil && nodeInfo.GetState() != session.NodeStateStopping
|
||||
})
|
||||
latestNodeDist := utils.FindMaxVersionSegments(aliveNodeDist)
|
||||
latestNodeDist := utils.FindMaxVersionSegments(dist)
|
||||
for _, s := range latestNodeDist {
|
||||
segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
|
||||
existInTarget := segment != nil
|
||||
|
|
|
@ -191,6 +191,10 @@ func (action *LeaderAction) Version() typeutil.UniqueID {
|
|||
return action.version
|
||||
}
|
||||
|
||||
func (action *LeaderAction) GetLeaderID() typeutil.UniqueID {
|
||||
return action.leaderID
|
||||
}
|
||||
|
||||
func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
|
||||
views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID)
|
||||
view := views[action.Shard()]
|
||||
|
|
|
@ -927,7 +927,7 @@ func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error {
|
|||
for _, action := range task.Actions() {
|
||||
switch action.Type() {
|
||||
case ActionTypeGrow:
|
||||
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
|
||||
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok {
|
||||
log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
|
||||
return merr.WrapErrNodeOffline(action.Node())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue