Remove task timeout (#19583)

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/19609/head
yah01 2022-09-30 19:48:55 +08:00 committed by GitHub
parent 08ade66c85
commit e6c7286bee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 82 additions and 123 deletions

View File

@ -170,7 +170,7 @@ queryCoord:
distPullInterval: 500
loadTimeoutSeconds: 600
checkHandoffInterval: 5000
taskMergeCap: 8
taskMergeCap: 16
enableActiveStandby: false # Enable active-standby
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.

View File

@ -207,6 +207,7 @@ func (job *LoadCollectionJob) Execute() error {
Status: querypb.LoadStatus_Loading,
},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
})
if err != nil {
msg := "failed to store collection"

View File

@ -15,6 +15,7 @@ type Collection struct {
*querypb.CollectionLoadInfo
LoadPercentage int32
CreatedAt time.Time
UpdatedAt time.Time
}
func (collection *Collection) Clone() *Collection {
@ -27,6 +28,7 @@ type Partition struct {
*querypb.PartitionLoadInfo
LoadPercentage int32
CreatedAt time.Time
UpdatedAt time.Time
}
func (partition *Partition) Clone() *Partition {
@ -279,6 +281,7 @@ func (m *CollectionManager) putCollection(collection *Collection, withSave bool)
return err
}
}
collection.UpdatedAt = time.Now()
m.collections[collection.CollectionID] = collection
return nil
@ -334,6 +337,7 @@ func (m *CollectionManager) putPartition(partitions []*Partition, withSave bool)
}
}
for _, partition := range partitions {
partition.UpdatedAt = time.Now()
m.partitions[partition.GetPartitionID()] = partition
}
return nil

View File

@ -70,7 +70,7 @@ func (ob *CollectionObserver) observeTimeout() {
collections := ob.meta.CollectionManager.GetAllCollections()
for _, collection := range collections {
if collection.GetStatus() != querypb.LoadStatus_Loading ||
time.Now().Before(collection.CreatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds)) {
time.Now().Before(collection.UpdatedAt.Add(Params.QueryCoordCfg.LoadTimeoutSeconds)) {
continue
}
@ -172,6 +172,9 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
updated := collection.Clone()
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
if updated.LoadPercentage <= collection.LoadPercentage {
return
}
if loadedCount >= len(segmentTargets)+len(channelTargets) {
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.UpdateCollection(updated)
@ -181,11 +184,9 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
} else {
ob.meta.CollectionManager.UpdateCollectionInMemory(updated)
}
if updated.LoadPercentage != collection.LoadPercentage {
log.Info("collection load status updated",
zap.Int32("loadPercentage", updated.LoadPercentage),
zap.Int32("collectionStatus", int32(updated.GetStatus())))
}
log.Info("collection load status updated",
zap.Int32("loadPercentage", updated.LoadPercentage),
zap.Int32("collectionStatus", int32(updated.GetStatus())))
}
func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partition) {
@ -230,18 +231,21 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
zap.Int("load-segment-count", loadedCount-subChannelCount))
}
partition = partition.Clone()
partition.LoadPercentage = int32(loadedCount * 100 / targetNum)
updated := partition.Clone()
updated.LoadPercentage = int32(loadedCount * 100 / targetNum)
if updated.LoadPercentage <= partition.LoadPercentage {
return
}
if loadedCount >= len(segmentTargets)+len(channelTargets) {
partition.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.PutPartition(partition)
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.UpdatePartition(updated)
elapsed := time.Since(partition.CreatedAt)
elapsed := time.Since(updated.CreatedAt)
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds()))
} else {
ob.meta.CollectionManager.UpdatePartitionInMemory(partition)
ob.meta.CollectionManager.UpdatePartitionInMemory(updated)
}
log.Info("partition load status updated",
zap.Int32("loadPercentage", partition.LoadPercentage),
zap.Int32("partitionStatus", int32(partition.GetStatus())))
zap.Int32("loadPercentage", updated.LoadPercentage),
zap.Int32("partitionStatus", int32(updated.GetStatus())))
}

View File

@ -3,7 +3,6 @@ package task
import (
"context"
"sync"
"time"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
@ -14,10 +13,6 @@ import (
"go.uber.org/zap"
)
const (
actionTimeout = 120 * time.Second
)
type actionIndex struct {
Task int64
Step int
@ -155,9 +150,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
}
log.Info("load segments...")
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
status, err := ex.cluster.LoadSegments(ctx, leader, mergeTask.req)
cancel()
status, err := ex.cluster.LoadSegments(task.Context(), leader, mergeTask.req)
if err != nil {
log.Warn("failed to load segment, it may be a false failure", zap.Error(err))
return
@ -196,7 +189,7 @@ func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
// loadSegment commits the request to merger,
// not really executes the request
func (ex *Executor) loadSegment(task *SegmentTask, step int) {
func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
action := task.Actions()[step].(*SegmentAction)
log := log.With(
zap.Int64("taskID", task.ID()),
@ -206,25 +199,26 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) {
zap.Int64("source", task.SourceID()),
)
shouldRemoveAction := true
var err error
defer func() {
if shouldRemoveAction {
if err != nil {
task.SetErr(err)
task.Cancel()
ex.removeAction(task, step)
}
}()
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
defer cancel()
ctx := task.Context()
schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID())
if err != nil {
log.Warn("failed to get schema of collection", zap.Error(err))
return
task.SetErr(err)
return err
}
partitions, err := utils.GetPartitions(ex.meta.CollectionManager, ex.broker, task.CollectionID())
if err != nil {
log.Warn("failed to get partitions of collection", zap.Error(err))
return
return err
}
loadMeta := packLoadMeta(
ex.meta.GetLoadType(task.CollectionID()),
@ -234,13 +228,13 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) {
segments, err := ex.broker.GetSegmentInfo(ctx, task.SegmentID())
if err != nil || len(segments) == 0 {
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
return
return err
}
segment := segments[0]
indexes, err := ex.broker.GetIndexInfo(ctx, task.CollectionID(), segment.GetID())
if err != nil {
log.Warn("failed to get index of segment", zap.Error(err))
return
return err
}
loadInfo := utils.PackSegmentLoadInfo(segment, indexes)
@ -248,9 +242,9 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) {
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel())
if !ok {
msg := "no shard leader for the segment to execute loading"
task.SetErr(utils.WrapError(msg, ErrTaskStale))
err = utils.WrapError(msg, ErrTaskStale)
log.Warn(msg, zap.String("shard", segment.GetInsertChannel()))
return
return err
}
log = log.With(zap.Int64("shardLeader", leader))
@ -258,7 +252,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) {
loadTask := NewLoadSegmentsTask(task, step, req)
ex.merger.Add(loadTask)
log.Info("load segment task committed")
shouldRemoveAction = false
return nil
}
func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
@ -275,8 +269,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
zap.Int64("source", task.SourceID()),
)
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
defer cancel()
ctx := task.Context()
dstNode := action.Node()
req := packReleaseSegmentRequest(task, action)
@ -334,7 +327,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
}
}
func (ex *Executor) subDmChannel(task *ChannelTask, step int) {
func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
action := task.Actions()[step].(*ChannelAction)
@ -346,18 +339,25 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) {
zap.Int64("source", task.SourceID()),
)
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
defer cancel()
var err error
defer func() {
if err != nil {
task.SetErr(err)
task.Cancel()
}
}()
ctx := task.Context()
schema, err := ex.broker.GetCollectionSchema(ctx, task.CollectionID())
if err != nil {
log.Warn("failed to get schema of collection")
return
return err
}
partitions, err := utils.GetPartitions(ex.meta.CollectionManager, ex.broker, task.CollectionID())
if err != nil {
log.Warn("failed to get partitions of collection")
return
return err
}
loadMeta := packLoadMeta(
ex.meta.GetLoadType(task.CollectionID()),
@ -371,22 +371,24 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) {
if err != nil {
log.Warn("failed to subscribe DmChannel, failed to fill the request with segments",
zap.Error(err))
return
return err
}
log.Info("subscribe channel...")
status, err := ex.cluster.WatchDmChannels(ctx, action.Node(), req)
if err != nil {
log.Warn("failed to subscribe DmChannel, it may be a false failure", zap.Error(err))
return
return err
}
if status.ErrorCode != commonpb.ErrorCode_Success {
err = utils.WrapError("failed to subscribe DmChannel", ErrFailedResponse)
log.Warn("failed to subscribe DmChannel", zap.String("reason", status.GetReason()))
return
return err
}
log.Info("subscribe DmChannel done")
return nil
}
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) {
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
action := task.Actions()[step].(*ChannelAction)
@ -398,17 +400,26 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) {
zap.Int64("source", task.SourceID()),
)
ctx, cancel := context.WithTimeout(task.Context(), actionTimeout)
defer cancel()
var err error
defer func() {
if err != nil {
task.SetErr(err)
task.Cancel()
}
}()
ctx := task.Context()
req := packUnsubDmChannelRequest(task, action)
status, err := ex.cluster.UnsubDmChannel(ctx, action.Node(), req)
if err != nil {
log.Warn("failed to unsubscribe DmChannel, it may be a false failure", zap.Error(err))
return
return err
}
if status.ErrorCode != commonpb.ErrorCode_Success {
err = utils.WrapError("failed to unsubscribe DmChannel", ErrFailedResponse)
log.Warn("failed to unsubscribe DmChannel", zap.String("reason", status.GetReason()))
return
return err
}
return nil
}

View File

@ -12,7 +12,7 @@ import (
)
// Merger merges tasks with the same mergeID.
const waitQueueCap = 128
const waitQueueCap = 256
type Merger[K comparable, R any] struct {
stopCh chan struct{}

View File

@ -37,6 +37,8 @@ var (
ErrResourceNotEnough = errors.New("ResourceNotEnough")
ErrTaskQueueFull = errors.New("TaskQueueFull")
ErrFailedResponse = errors.New("RpcFailed")
)
type Type = int32
@ -478,7 +480,9 @@ func (scheduler *taskScheduler) process(task Task) bool {
task.SetStatus(TaskStatusSucceeded)
} else if scheduler.checkCanceled(task) {
task.SetStatus(TaskStatusCanceled)
task.SetErr(ErrTaskCanceled)
if task.Err() == nil {
task.SetErr(ErrTaskCanceled)
}
} else if scheduler.checkStale(task) {
task.SetStatus(TaskStatusStale)
task.SetErr(ErrTaskStale)

View File

@ -75,8 +75,8 @@ type baseTask struct {
step int
}
func newBaseTask(ctx context.Context, timeout time.Duration, sourceID, collectionID, replicaID UniqueID, shard string) *baseTask {
ctx, cancel := context.WithTimeout(ctx, timeout)
func newBaseTask(ctx context.Context, sourceID, collectionID, replicaID UniqueID, shard string) *baseTask {
ctx, cancel := context.WithCancel(ctx)
return &baseTask{
sourceID: sourceID,
@ -237,7 +237,7 @@ func NewSegmentTask(ctx context.Context,
}
}
base := newBaseTask(ctx, timeout, sourceID, collectionID, replicaID, shard)
base := newBaseTask(ctx, sourceID, collectionID, replicaID, shard)
base.actions = actions
return &SegmentTask{
baseTask: base,
@ -287,7 +287,7 @@ func NewChannelTask(ctx context.Context,
}
}
base := newBaseTask(ctx, timeout, sourceID, collectionID, replicaID, channel)
base := newBaseTask(ctx, sourceID, collectionID, replicaID, channel)
base.actions = actions
return &ChannelTask{
baseTask: base,

View File

@ -4,17 +4,13 @@ import (
"context"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo"
)
func Wait(ctx context.Context, timeout time.Duration, tasks ...Task) error {
@ -174,64 +170,3 @@ func getShardLeader(replicaMgr *meta.ReplicaManager, distMgr *meta.DistributionM
}
return distMgr.GetShardLeader(replica, channel)
}
func getSegmentDeltaPositions(ctx context.Context, targetMgr *meta.TargetManager, broker meta.Broker, collectionID, partitionID int64, channel string) ([]*internalpb.MsgPosition, error) {
deltaChannelName, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
return nil, err
}
// vchannels, _, err := broker.GetRecoveryInfo(ctx, collectionID, partitionID)
// if err != nil {
// return nil, err
// }
deltaChannels := make([]*datapb.VchannelInfo, 0)
for _, info := range targetMgr.GetDmChannelsByCollection(collectionID) {
deltaChannelInfo, err := generatDeltaChannelInfo(info.VchannelInfo)
if err != nil {
return nil, err
}
if deltaChannelInfo.ChannelName == deltaChannelName {
deltaChannels = append(deltaChannels, deltaChannelInfo)
}
}
deltaChannels = mergeWatchDeltaChannelInfo(deltaChannels)
return lo.Map(deltaChannels, func(channel *datapb.VchannelInfo, _ int) *internalpb.MsgPosition {
return channel.GetSeekPosition()
}), nil
}
func generatDeltaChannelInfo(info *datapb.VchannelInfo) (*datapb.VchannelInfo, error) {
deltaChannelName, err := funcutil.ConvertChannelName(info.ChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
return nil, err
}
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
deltaChannel.UnflushedSegmentIds = nil
deltaChannel.FlushedSegmentIds = nil
deltaChannel.DroppedSegmentIds = nil
return deltaChannel, nil
}
func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.VchannelInfo {
minPositions := make(map[string]int)
for index, info := range infos {
_, ok := minPositions[info.ChannelName]
if !ok {
minPositions[info.ChannelName] = index
}
minTimeStampIndex := minPositions[info.ChannelName]
if info.SeekPosition.GetTimestamp() < infos[minTimeStampIndex].SeekPosition.GetTimestamp() {
minPositions[info.ChannelName] = index
}
}
var result []*datapb.VchannelInfo
for _, index := range minPositions {
result = append(result, infos[index])
}
return result
}

View File

@ -683,7 +683,7 @@ func (p *queryCoordConfig) initTaskRetryInterval() {
}
func (p *queryCoordConfig) initTaskMergeCap() {
p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 8)
p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16)
}
func (p *queryCoordConfig) initAutoHandoff() {