mirror of https://github.com/milvus-io/milvus.git
fix: Check stale should check leader task's leader id (#31962)
issue: #30816 check stale rules for leader task: 1. for reduce leader task, it should keep executing until leader's node become offline. 2. for grow leader task,it should keep executing until leader's node become stopping. This PR check leader node's stopping state for grow leader task Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/32064/head
parent
5b693c466d
commit
177ddda47f
|
@ -20,7 +20,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/samber/lo"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||||
|
@ -122,12 +121,7 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
|
||||||
)
|
)
|
||||||
ret := make([]task.Task, 0)
|
ret := make([]task.Task, 0)
|
||||||
|
|
||||||
// skip set segment on stopping node to leader view
|
latestNodeDist := utils.FindMaxVersionSegments(dist)
|
||||||
aliveNodeDist := lo.Filter(dist, func(s *meta.Segment, _ int) bool {
|
|
||||||
nodeInfo := c.nodeMgr.Get(s.Node)
|
|
||||||
return nodeInfo != nil && nodeInfo.GetState() != session.NodeStateStopping
|
|
||||||
})
|
|
||||||
latestNodeDist := utils.FindMaxVersionSegments(aliveNodeDist)
|
|
||||||
for _, s := range latestNodeDist {
|
for _, s := range latestNodeDist {
|
||||||
segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
|
segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
|
||||||
existInTarget := segment != nil
|
existInTarget := segment != nil
|
||||||
|
|
|
@ -191,6 +191,10 @@ func (action *LeaderAction) Version() typeutil.UniqueID {
|
||||||
return action.version
|
return action.version
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (action *LeaderAction) GetLeaderID() typeutil.UniqueID {
|
||||||
|
return action.leaderID
|
||||||
|
}
|
||||||
|
|
||||||
func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
|
func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
|
||||||
views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID)
|
views := distMgr.LeaderViewManager.GetLeaderView(action.leaderID)
|
||||||
view := views[action.Shard()]
|
view := views[action.Shard()]
|
||||||
|
|
|
@ -966,7 +966,7 @@ func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error {
|
||||||
for _, action := range task.Actions() {
|
for _, action := range task.Actions() {
|
||||||
switch action.Type() {
|
switch action.Type() {
|
||||||
case ActionTypeGrow:
|
case ActionTypeGrow:
|
||||||
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
|
if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok {
|
||||||
log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
|
log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
|
||||||
return merr.WrapErrNodeOffline(action.Node())
|
return merr.WrapErrNodeOffline(action.Node())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue