fix: clean up old runs when we fail to enqueue it (#18482)
* fix: clean up old runs when we fail to enqueue it We need to make sure the runs are removed if we fail to enqueue the run to be worked. for this we will now add an error message stating why we failed and move the run out of our kv store. * fix: update fail messagepull/18495/head
parent
ff620782eb
commit
22c573df68
|
|
@ -214,8 +214,20 @@ func (e *Executor) createRun(ctx context.Context, id influxdb.ID, scheduledFor t
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p, err := e.createPromise(ctx, r)
|
||||
if err != nil {
|
||||
if err := e.tcs.AddRunLog(ctx, id, r.ID, time.Now().UTC(), fmt.Sprintf("Failed to enqueue run: %s", err.Error())); err != nil {
|
||||
e.log.Error("failed to fail create run: AddRunLog:", zap.Error(err))
|
||||
}
|
||||
if err := e.tcs.UpdateRunState(ctx, id, r.ID, time.Now().UTC(), influxdb.RunFail); err != nil {
|
||||
e.log.Error("failed to fail create run: UpdateRunState:", zap.Error(err))
|
||||
}
|
||||
if _, err := e.tcs.FinishRun(ctx, id, r.ID); err != nil {
|
||||
e.log.Error("failed to fail create run: FinishRun:", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return e.createPromise(ctx, r)
|
||||
return p, err
|
||||
}
|
||||
|
||||
func (e *Executor) startWorker() {
|
||||
|
|
|
|||
|
|
@ -535,6 +535,51 @@ func testErrorHandling(t *testing.T) {
|
|||
*/
|
||||
}
|
||||
|
||||
func TestPromiseFailure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tes := taskExecutorSystem(t)
|
||||
|
||||
var (
|
||||
script = fmt.Sprintf(fmtTestScript, t.Name())
|
||||
ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
|
||||
span = opentracing.GlobalTracer().StartSpan("test-span")
|
||||
)
|
||||
ctx = opentracing.ContextWithSpan(ctx, span)
|
||||
|
||||
task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := tes.i.DeleteTask(ctx, task.ID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
|
||||
if err == nil {
|
||||
t.Fatal("failed to error on promise create")
|
||||
}
|
||||
|
||||
if promise != nil {
|
||||
t.Fatalf("expected no promise but recieved one: %+v", promise)
|
||||
}
|
||||
|
||||
runs, _, err := tes.i.FindRuns(context.Background(), influxdb.RunFilter{Task: task.ID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(runs) != 1 {
|
||||
t.Fatalf("expected 1 runs on failed promise: got: %d, %#v", len(runs), runs[0])
|
||||
}
|
||||
|
||||
if runs[0].Status != "failed" {
|
||||
t.Fatal("failed to set failed state")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type taskControlService struct {
|
||||
backend.TaskControlService
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue