fix: [2.5] Fix task scheduler dead lock (#40121)

issue: #39101 

master pr: #39084

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/40125/head
cai.zhang 2025-02-23 18:29:55 +08:00 committed by GitHub
parent 9cbab98382
commit 6c81463263
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 6 additions and 2 deletions

View File

@ -343,6 +343,9 @@ func (s *taskScheduler) checkProcessingTasks() {
taskID := taskID
go func(taskID int64) {
defer wg.Done()
defer func() {
<-sem
}()
task := s.getRunningTask(taskID)
s.taskLock.Lock(taskID)
suc := s.checkProcessingTask(task)
@ -350,7 +353,6 @@ func (s *taskScheduler) checkProcessingTasks() {
if suc {
s.removeRunningTask(taskID)
}
<-sem
}(taskID)
}
wg.Wait()
@ -398,6 +400,9 @@ func (s *taskScheduler) run() {
sem <- struct{}{}
go func(task Task, nodeID int64) {
defer wg.Done()
defer func() {
<-sem
}()
s.taskLock.Lock(task.GetTaskID())
s.process(task, nodeID)
@ -413,7 +418,6 @@ func (s *taskScheduler) run() {
s.runningTasks[task.GetTaskID()] = task
s.runningQueueLock.Unlock()
}
<-sem
}(task, nodeID)
}
wg.Wait()