Fix the number of executing tasks may break the limit (#21318)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/21351/head
yah01 2022-12-20 20:33:27 +08:00 committed by GitHub
parent d401608899
commit 396a85c926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 7 deletions

View File

@ -252,9 +252,11 @@ func (s *Server) initMeta() error {
log.Error("failed to recover collections") log.Error("failed to recover collections")
return err return err
} }
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(s.meta.GetAll()))) collections := s.meta.GetAll()
log.Info("recovering collections...", zap.Int64s("collections", collections))
metrics.QueryCoordNumCollections.WithLabelValues().Set(float64(len(collections)))
err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll()) err = s.meta.ReplicaManager.Recover(collections)
if err != nil { if err != nil {
log.Error("failed to recover replicas") log.Error("failed to recover replicas")
return err return err

View File

@ -85,17 +85,20 @@ func (ex *Executor) Stop() {
// does nothing and returns false if the action is already committed, // does nothing and returns false if the action is already committed,
// returns true otherwise. // returns true otherwise.
func (ex *Executor) Execute(task Task, step int) bool { func (ex *Executor) Execute(task Task, step int) bool {
if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() {
return false
}
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{}) _, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{})
if exist { if exist {
return false return false
} }
ex.executingTaskNum.Inc() if ex.executingTaskNum.Inc() > Params.QueryCoordCfg.TaskExecutionCap.GetAsInt32() {
ex.executingTasks.Delete(task.ID())
ex.executingTaskNum.Dec()
return false
}
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int("step", step), zap.Int("step", step),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -127,6 +130,7 @@ func (ex *Executor) scheduleRequests() {
task := mergeTask.(*LoadSegmentsTask) task := mergeTask.(*LoadSegmentsTask)
log.Info("get merge task, process it", log.Info("get merge task, process it",
zap.Int64("collectionID", task.req.GetCollectionID()), zap.Int64("collectionID", task.req.GetCollectionID()),
zap.Int64("replicaID", task.req.GetReplicaID()),
zap.String("shard", task.req.GetInfos()[0].GetInsertChannel()), zap.String("shard", task.req.GetInfos()[0].GetInsertChannel()),
zap.Int64("nodeID", task.req.GetDstNodeID()), zap.Int64("nodeID", task.req.GetDstNodeID()),
zap.Int("taskNum", len(task.tasks)), zap.Int("taskNum", len(task.tasks)),
@ -157,6 +161,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log := log.With( log := log.With(
zap.Int64s("taskIDs", taskIDs), zap.Int64s("taskIDs", taskIDs),
zap.Int64("collectionID", task.CollectionID()), zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("shard", task.Shard()), zap.String("shard", task.Shard()),
zap.Int64s("segmentIDs", segments), zap.Int64s("segmentIDs", segments),
zap.Int64("nodeID", action.Node()), zap.Int64("nodeID", action.Node()),
@ -216,6 +221,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()), zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("segmentID", task.segmentID), zap.Int64("segmentID", task.segmentID),
zap.Int64("node", action.Node()), zap.Int64("node", action.Node()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
@ -286,6 +292,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()), zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("segmentID", task.segmentID), zap.Int64("segmentID", task.segmentID),
zap.Int64("node", action.Node()), zap.Int64("node", action.Node()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
@ -357,6 +364,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()), zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("channel", task.Channel()), zap.String("channel", task.Channel()),
zap.Int64("node", action.Node()), zap.Int64("node", action.Node()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
@ -429,6 +437,7 @@ func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()), zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("channel", task.Channel()), zap.String("channel", task.Channel()),
zap.Int64("node", action.Node()), zap.Int64("node", action.Node()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),

View File

@ -327,8 +327,9 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
func (scheduler *taskScheduler) promote(task Task) error { func (scheduler *taskScheduler) promote(task Task) error {
log := log.With( log := log.With(
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
err := scheduler.prePromote(task) err := scheduler.prePromote(task)
@ -562,6 +563,8 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int32("type", GetTaskType(task)), zap.Int32("type", GetTaskType(task)),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -606,6 +609,8 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
func (scheduler *taskScheduler) process(task Task) bool { func (scheduler *taskScheduler) process(task Task) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int32("type", GetTaskType(task)), zap.Int32("type", GetTaskType(task)),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -656,6 +661,8 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) {
func (scheduler *taskScheduler) remove(task Task) { func (scheduler *taskScheduler) remove(task Task) {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int32("taskStatus", task.Status()), zap.Int32("taskStatus", task.Status()),
) )
task.Cancel() task.Cancel()
@ -682,6 +689,8 @@ func (scheduler *taskScheduler) remove(task Task) {
func (scheduler *taskScheduler) checkCanceled(task Task) bool { func (scheduler *taskScheduler) checkCanceled(task Task) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -698,6 +707,8 @@ func (scheduler *taskScheduler) checkCanceled(task Task) bool {
func (scheduler *taskScheduler) checkStale(task Task) bool { func (scheduler *taskScheduler) checkStale(task Task) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -733,6 +744,8 @@ func (scheduler *taskScheduler) checkStale(task Task) bool {
func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool { func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )
@ -776,6 +789,8 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) bool {
func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool { func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) bool {
log := log.With( log := log.With(
zap.Int64("taskID", task.ID()), zap.Int64("taskID", task.ID()),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("source", task.SourceID()), zap.Int64("source", task.SourceID()),
) )