fix(task): fix an issue where tasks fail to resume on claim (#14356)
parent
c6f66e1e21
commit
10bfc91562
|
@ -307,8 +307,6 @@ func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task)
|
|||
return err
|
||||
}
|
||||
|
||||
s.taskSchedulers[task.ID] = ts
|
||||
|
||||
// pickup any runs that are still "running from a previous failure"
|
||||
runs, err := s.taskControlService.CurrentlyRunning(authCtx, task.ID)
|
||||
if err != nil {
|
||||
|
@ -320,6 +318,8 @@ func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task)
|
|||
}
|
||||
}
|
||||
|
||||
s.taskSchedulers[task.ID] = ts
|
||||
|
||||
next, hasQueue := ts.NextDue()
|
||||
if now := atomic.LoadInt64(&s.now); now >= next || hasQueue {
|
||||
ts.Work()
|
||||
|
@ -509,8 +509,9 @@ func (ts *taskScheduler) Work() {
|
|||
}
|
||||
|
||||
func (ts *taskScheduler) WorkCurrentlyRunning(runs []*platform.Run) error {
|
||||
foundWorker := false
|
||||
|
||||
for _, cr := range runs {
|
||||
foundWorker := false
|
||||
for _, r := range ts.runners {
|
||||
t, err := time.Parse(time.RFC3339, cr.ScheduledFor)
|
||||
if err != nil {
|
||||
|
@ -523,9 +524,10 @@ func (ts *taskScheduler) WorkCurrentlyRunning(runs []*platform.Run) error {
|
|||
}
|
||||
}
|
||||
|
||||
if !foundWorker {
|
||||
return errors.New("worker not found to resume work")
|
||||
}
|
||||
}
|
||||
|
||||
if !foundWorker {
|
||||
return errors.New("worker not found to resume work")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue