mirror of https://github.com/milvus-io/milvus.git
fix check segment stale task (#20401)
Signed-off-by: Wei Liu <wei.liu@zilliz.com> Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/20409/head
parent
889eb02172
commit
f925fa7661
|
@ -25,6 +25,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
|
@ -475,7 +476,13 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
|
|||
return true
|
||||
}
|
||||
if task, ok := task.(*SegmentTask); ok {
|
||||
segment := scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
taskType := GetTaskType(task)
|
||||
var segment *datapb.SegmentInfo
|
||||
if taskType == TaskTypeMove {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
|
||||
} else {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
}
|
||||
if segment == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -638,12 +645,21 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
|
|||
for _, action := range task.Actions() {
|
||||
switch action.Type() {
|
||||
case ActionTypeGrow:
|
||||
segment := scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
taskType := GetTaskType(task)
|
||||
var segment *datapb.SegmentInfo
|
||||
if taskType == TaskTypeMove {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
|
||||
} else {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
}
|
||||
if segment == nil {
|
||||
log.Warn("task stale due tu the segment to load not exists in targets",
|
||||
zap.Int64("segment", task.segmentID))
|
||||
log.Warn("task stale due to the segment to load not exists in targets",
|
||||
zap.Int64("segment", task.segmentID),
|
||||
zap.Int32("taskType", taskType),
|
||||
)
|
||||
return true
|
||||
}
|
||||
|
||||
replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
|
||||
if replica == nil {
|
||||
log.Warn("task stale due to replica not found")
|
||||
|
|
|
@ -636,10 +636,11 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
|
|||
suite.cluster.EXPECT().ReleaseSegments(mock.Anything, leader, mock.Anything).Return(utils.WrapStatus(commonpb.ErrorCode_Success, ""), nil)
|
||||
|
||||
// Test move segment task
|
||||
suite.dist.ChannelDistManager.Update(leader, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||
vchannel := &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
}
|
||||
suite.dist.ChannelDistManager.Update(leader, meta.DmChannelFromVChannel(vchannel))
|
||||
view := &meta.LeaderView{
|
||||
ID: leader,
|
||||
CollectionID: suite.collection,
|
||||
|
@ -672,8 +673,9 @@ func (suite *TaskSuite) TestMoveSegmentTask() {
|
|||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return(nil, segmentInfos, nil)
|
||||
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return([]*datapb.VchannelInfo{vchannel}, segmentInfos, nil)
|
||||
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1))
|
||||
suite.target.UpdateCollectionCurrentTarget(suite.collection, int64(1))
|
||||
suite.dist.SegmentDistManager.Update(sourceNode, segments...)
|
||||
suite.dist.LeaderViewManager.Update(leader, view)
|
||||
segmentsNum := len(suite.moveSegments)
|
||||
|
|
|
@ -698,7 +698,6 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.initSegmentTaskTimeout()
|
||||
p.initDistPullInterval()
|
||||
p.initLoadTimeoutSeconds()
|
||||
p.initCheckHandoffInterval()
|
||||
p.initEnableActiveStandby()
|
||||
p.initNextTargetSurviveTime()
|
||||
p.initUpdateNextTargetInterval()
|
||||
|
@ -812,15 +811,6 @@ func (p *queryCoordConfig) initLoadTimeoutSeconds() {
|
|||
p.LoadTimeoutSeconds = time.Duration(loadTimeout) * time.Second
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initCheckHandoffInterval() {
|
||||
interval := p.Base.LoadWithDefault("queryCoord.checkHandoffInterval", "5000")
|
||||
checkHandoffInterval, err := strconv.ParseInt(interval, 10, 64)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.CheckHandoffInterval = time.Duration(checkHandoffInterval) * time.Millisecond
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initNextTargetSurviveTime() {
|
||||
interval := p.Base.LoadWithDefault("queryCoord.NextTargetSurviveTime", "300")
|
||||
nextTargetSurviveTime, err := strconv.ParseInt(interval, 10, 64)
|
||||
|
@ -831,7 +821,7 @@ func (p *queryCoordConfig) initNextTargetSurviveTime() {
|
|||
}
|
||||
|
||||
func (p *queryCoordConfig) initUpdateNextTargetInterval() {
|
||||
interval := p.Base.LoadWithDefault("queryCoord.UpdateNextTargetInterval", "30")
|
||||
interval := p.Base.LoadWithDefault("queryCoord.UpdateNextTargetInterval", "10")
|
||||
updateNextTargetInterval, err := strconv.ParseInt(interval, 10, 64)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
Loading…
Reference in New Issue