fix(task): recover when call to execute fails
This was a missed case from #11817. This case currently occurs when creating a task through the UI, using a session rather than a full-fledged authorization. It doesn't fix that case yet, but at least it will log an informative message.pull/11937/head
parent
954cea93bc
commit
12292a4983
|
@ -634,9 +634,14 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
defer sp.Finish()
|
||||
|
||||
rp, err := r.executor.Execute(spCtx, qr)
|
||||
|
||||
if err != nil {
|
||||
// TODO(mr): retry? and log error.
|
||||
runLogger.Info("Failed to begin run execution", zap.Error(err))
|
||||
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
|
||||
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
|
||||
runLogger.Error("Beginning run execution failed, and desired state update failed", zap.Error(err))
|
||||
}
|
||||
|
||||
// TODO(mr): retry?
|
||||
atomic.StoreUint32(r.state, runnerIdle)
|
||||
r.updateRunState(qr, RunFail, runLogger)
|
||||
return
|
||||
|
@ -675,7 +680,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
runLogger.Info("Failed to wait for execution result", zap.Error(err))
|
||||
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
|
||||
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
|
||||
runLogger.Error("Waiting for execution result failed, and desired update failed", zap.Error(err))
|
||||
runLogger.Error("Waiting for execution result failed, and desired state update failed", zap.Error(err))
|
||||
}
|
||||
|
||||
// TODO(mr): retry?
|
||||
|
@ -687,7 +692,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
runLogger.Info("Run failed to execute", zap.Error(err))
|
||||
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
|
||||
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
|
||||
runLogger.Error("Run failed to execute, and desired update failed", zap.Error(err))
|
||||
runLogger.Error("Run failed to execute, and desired state update failed", zap.Error(err))
|
||||
}
|
||||
// TODO(mr): retry?
|
||||
r.updateRunState(qr, RunFail, runLogger)
|
||||
|
|
|
@ -585,8 +585,28 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// One more tick just to ensure that we can keep going after this type of failure too.
|
||||
// Fail to execute next run.
|
||||
if n := d.TotalRunsCreatedForTask(task.ID); n != 2 {
|
||||
t.Fatalf("should have created 2 runs so far, got %d", n)
|
||||
}
|
||||
e.FailNextCallToExecute(errors.New("forced failure on Execute"))
|
||||
s.Tick(8)
|
||||
// The execution happens in the background, so check a few times for 3 runs created.
|
||||
const attempts = 50
|
||||
for i := 0; i < attempts; i++ {
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
n := d.TotalRunsCreatedForTask(task.ID)
|
||||
if n == 3 {
|
||||
break
|
||||
}
|
||||
if i == attempts-1 {
|
||||
// Fail if we haven't seen the right count by the last attempt.
|
||||
t.Fatalf("expected 3 runs created, got %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
// One more tick just to ensure that we can keep going after this type of failure too.
|
||||
s.Tick(9)
|
||||
_, err = e.PollForNumberRunning(task.ID, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -172,15 +172,19 @@ type DesiredState struct {
|
|||
|
||||
// Map of stringified task ID to task meta.
|
||||
meta map[string]backend.StoreTaskMeta
|
||||
|
||||
// Map of task ID to total number of runs created for that task.
|
||||
totalRunsCreated map[platform.ID]int
|
||||
}
|
||||
|
||||
var _ backend.DesiredState = (*DesiredState)(nil)
|
||||
|
||||
func NewDesiredState() *DesiredState {
|
||||
return &DesiredState{
|
||||
runIDs: make(map[string]uint64),
|
||||
created: make(map[string]backend.QueuedRun),
|
||||
meta: make(map[string]backend.StoreTaskMeta),
|
||||
runIDs: make(map[string]uint64),
|
||||
created: make(map[string]backend.QueuedRun),
|
||||
meta: make(map[string]backend.StoreTaskMeta),
|
||||
totalRunsCreated: make(map[platform.ID]int),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,6 +225,7 @@ func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now
|
|||
d.meta[tid] = meta
|
||||
rc.Created.TaskID = taskID
|
||||
d.created[tid+rc.Created.RunID.String()] = rc.Created
|
||||
d.totalRunsCreated[taskID]++
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
|
@ -257,7 +262,15 @@ func (d *DesiredState) CreatedFor(taskID platform.ID) []backend.QueuedRun {
|
|||
return qrs
|
||||
}
|
||||
|
||||
// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created runs for the given task ID.
|
||||
// TotalRunsCreatedForTask returns the number of runs created for taskID.
|
||||
func (d *DesiredState) TotalRunsCreatedForTask(taskID platform.ID) int {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
return d.totalRunsCreated[taskID]
|
||||
}
|
||||
|
||||
// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created and unfinished runs for the given task ID.
|
||||
// If the expected number isn't found in time, it returns an error.
|
||||
//
|
||||
// Because the scheduler and executor do a lot of state changes asynchronously, this is useful in test.
|
||||
|
@ -273,7 +286,7 @@ func (d *DesiredState) PollForNumberCreated(taskID platform.ID, count int) ([]sc
|
|||
return created, nil
|
||||
}
|
||||
}
|
||||
return created, fmt.Errorf("did not see count of %d created task(s) for ID %s in time, instead saw %d", count, taskID.String(), actualCount) // we return created anyways, to make it easier to debug
|
||||
return created, fmt.Errorf("did not see count of %d created run(s) for task with ID %s in time, instead saw %d", count, taskID.String(), actualCount) // we return created anyways, to make it easier to debug
|
||||
}
|
||||
|
||||
type Executor struct {
|
||||
|
@ -286,6 +299,9 @@ type Executor struct {
|
|||
// Map of stringified, concatenated task and run ID, to results of runs that have executed and completed.
|
||||
finished map[string]backend.RunResult
|
||||
|
||||
// Forced error for next call to Execute.
|
||||
nextExecuteErr error
|
||||
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
|
@ -303,6 +319,11 @@ func (e *Executor) Execute(ctx context.Context, run backend.QueuedRun) (backend.
|
|||
rp.WithHanging(ctx, e.hangingFor)
|
||||
id := run.TaskID.String() + run.RunID.String()
|
||||
e.mu.Lock()
|
||||
if err := e.nextExecuteErr; err != nil {
|
||||
e.nextExecuteErr = nil
|
||||
e.mu.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
e.running[id] = rp
|
||||
e.mu.Unlock()
|
||||
e.wg.Add(1)
|
||||
|
@ -321,6 +342,13 @@ func (e *Executor) Wait() {
|
|||
e.wg.Wait()
|
||||
}
|
||||
|
||||
// FailNextCallToExecute causes the next call to e.Execute to unconditionally return err.
|
||||
func (e *Executor) FailNextCallToExecute(err error) {
|
||||
e.mu.Lock()
|
||||
e.nextExecuteErr = err
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
func (e *Executor) WithHanging(dt time.Duration) {
|
||||
e.hangingFor = dt
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue