From 22c573df683af6ff2c8c00286bde68ecb00815c2 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Fri, 12 Jun 2020 13:02:31 -0600 Subject: [PATCH] fix: clean up old runs when we fail to enqueue it (#18482) * fix: clean up old runs when we fail to enqueue it We need to make sure the runs are removed if we fail to enqueue the run to be worked. for this we will now add an error message stating why we failed and move the run out of our kv store. * fix: update fail message --- task/backend/executor/executor.go | 14 +++++++- task/backend/executor/executor_test.go | 45 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index cd4dfc564b..1862cf1fad 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -214,8 +214,20 @@ func (e *Executor) createRun(ctx context.Context, id influxdb.ID, scheduledFor t if err != nil { return nil, err } + p, err := e.createPromise(ctx, r) + if err != nil { + if err := e.tcs.AddRunLog(ctx, id, r.ID, time.Now().UTC(), fmt.Sprintf("Failed to enqueue run: %s", err.Error())); err != nil { + e.log.Error("failed to fail create run: AddRunLog:", zap.Error(err)) + } + if err := e.tcs.UpdateRunState(ctx, id, r.ID, time.Now().UTC(), influxdb.RunFail); err != nil { + e.log.Error("failed to fail create run: UpdateRunState:", zap.Error(err)) + } + if _, err := e.tcs.FinishRun(ctx, id, r.ID); err != nil { + e.log.Error("failed to fail create run: FinishRun:", zap.Error(err)) + } + } - return e.createPromise(ctx, r) + return p, err } func (e *Executor) startWorker() { diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 3d6a5d6384..96de7efdb5 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -535,6 +535,51 @@ func testErrorHandling(t *testing.T) { */ } +func TestPromiseFailure(t *testing.T) { + t.Parallel() + + tes := taskExecutorSystem(t) + + var ( + script = fmt.Sprintf(fmtTestScript, t.Name()) + ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth) + span = opentracing.GlobalTracer().StartSpan("test-span") + ) + ctx = opentracing.ContextWithSpan(ctx, span) + + task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) + if err != nil { + t.Fatal(err) + } + + if err := tes.i.DeleteTask(ctx, task.ID); err != nil { + t.Fatal(err) + } + + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) + if err == nil { + t.Fatal("failed to error on promise create") + } + + if promise != nil { + t.Fatalf("expected no promise but recieved one: %+v", promise) + } + + runs, _, err := tes.i.FindRuns(context.Background(), influxdb.RunFilter{Task: task.ID}) + if err != nil { + t.Fatal(err) + } + + if len(runs) != 1 { + t.Fatalf("expected 1 runs on failed promise: got: %d, %#v", len(runs), runs[0]) + } + + if runs[0].Status != "failed" { + t.Fatal("failed to set failed state") + } + +} + type taskControlService struct { backend.TaskControlService