From 396a85c92614a537958c921ec00aac24fbb20ced Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 20 Dec 2022 20:33:27 +0800 Subject: [PATCH] Fix the number of executing tasks may break the limit (#21318) Signed-off-by: yah01 --- internal/querycoordv2/server.go | 6 ++++-- internal/querycoordv2/task/executor.go | 17 +++++++++++++---- internal/querycoordv2/task/scheduler.go | 17 ++++++++++++++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index c3839af19e..218a266f04 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -252,9 +252,11 @@ func (s *Server) initMeta() error { log.Error("failed to recover collections") return err } - metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(s.meta.GetAll()))) + collections := s.meta.GetAll() + log.Info("recovering collections...", zap.Int64s("collections", collections)) + metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(collections))) - err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll()) + err = s.meta.ReplicaManager.Recover(collections) if err != nil { log.Error("failed to recover replicas") return err diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 40c47323ac..38bf74da4f 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -85,17 +85,20 @@ func (ex *Executor) Stop() { // does nothing and returns false if the action is already committed, // returns true otherwise. func (ex *Executor) Execute(task Task, step int) bool { - if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() { - return false - } _, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) if exist { return false } - ex.executingTaskNum.Inc() + if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() { + ex.executingTasks.Delete(task.ID()) + ex.executingTaskNum.Dec() + return false + } log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int("step", step), zap.Int64("source", task.SourceID()), ) @@ -127,6 +130,7 @@ func (ex *Executor) scheduleRequests() { task := mergeTask.(*LoadSegmentsTask) log.Info("get merge task, process it", zap.Int64("collectionID", task.req.GetCollectionID()), + zap.Int64("replicaID", task.req.GetReplicaID()), zap.String("shard", task.req.GetInfos()[0].GetInsertChannel()), zap.Int64("nodeID", task.req.GetDstNodeID()), zap.Int("taskNum", len(task.tasks)), @@ -157,6 +161,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) { log := log.With( zap.Int64s("taskIDs", taskIDs), zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.String("shard", task.Shard()), zap.Int64s("segmentIDs", segments), zap.Int64("nodeID", action.Node()), @@ -216,6 +221,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { log := log.With( zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("segmentID", task.segmentID), zap.Int64("node", action.Node()), zap.Int64("source", task.SourceID()), @@ -286,6 +292,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { log := log.With( zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("segmentID", task.segmentID), zap.Int64("node", action.Node()), zap.Int64("source", task.SourceID()), @@ -357,6 +364,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error { log := log.With( zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.String("channel", task.Channel()), zap.Int64("node", action.Node()), zap.Int64("source", task.SourceID()), @@ -429,6 +437,7 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error { log := log.With( zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.String("channel", task.Channel()), zap.Int64("node", action.Node()), zap.Int64("source", task.SourceID()), diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 81a4d4222a..193dba617d 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -327,8 +327,9 @@ func (scheduler *taskScheduler) preAdd(task Task) error { func (scheduler *taskScheduler) promote(task Task) error { log := log.With( - zap.Int64("collectionID", task.CollectionID()), zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), ) err := scheduler.prePromote(task) @@ -562,6 +563,8 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { log := log.With( zap.Int64("taskID", task.ID()), zap.Int32("type", GetTaskType(task)), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), ) @@ -606,6 +609,8 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { func (scheduler *taskScheduler) process(task Task) bool { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int32("type", GetTaskType(task)), zap.Int64("source", task.SourceID()), ) @@ -656,6 +661,8 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) { func (scheduler *taskScheduler) remove(task Task) { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int32("taskStatus", task.Status()), ) task.Cancel() @@ -682,6 +689,8 @@ func (scheduler *taskScheduler) remove(task Task) { func (scheduler *taskScheduler) checkCanceled(task Task) bool { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), ) @@ -698,6 +707,8 @@ func (scheduler *taskScheduler) checkCanceled(task Task) bool { func (scheduler *taskScheduler) checkStale(task Task) bool { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), ) @@ -733,6 +744,8 @@ func (scheduler *taskScheduler) checkStale(task Task) bool { func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), ) @@ -776,6 +789,8 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool { log := log.With( zap.Int64("taskID", task.ID()), + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("replicaID", task.ReplicaID()), zap.Int64("source", task.SourceID()), )