diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index cd4dfc564b..1862cf1fad 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -214,8 +214,20 @@ func (e *Executor) createRun(ctx context.Context, id influxdb.ID, scheduledFor t if err != nil { return nil, err } + p, err := e.createPromise(ctx, r) + if err != nil { + if err := e.tcs.AddRunLog(ctx, id, r.ID, time.Now().UTC(), fmt.Sprintf("Failed to enqueue run: %s", err.Error())); err != nil { + e.log.Error("failed to fail create run: AddRunLog:", zap.Error(err)) + } + if err := e.tcs.UpdateRunState(ctx, id, r.ID, time.Now().UTC(), influxdb.RunFail); err != nil { + e.log.Error("failed to fail create run: UpdateRunState:", zap.Error(err)) + } + if _, err := e.tcs.FinishRun(ctx, id, r.ID); err != nil { + e.log.Error("failed to fail create run: FinishRun:", zap.Error(err)) + } + } - return e.createPromise(ctx, r) + return p, err } func (e *Executor) startWorker() { diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 3d6a5d6384..96de7efdb5 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -535,6 +535,51 @@ func testErrorHandling(t *testing.T) { */ } +func TestPromiseFailure(t *testing.T) { + t.Parallel() + + tes := taskExecutorSystem(t) + + var ( + script = fmt.Sprintf(fmtTestScript, t.Name()) + ctx = icontext.SetAuthorizer(context.Background(), tes.tc.Auth) + span = opentracing.GlobalTracer().StartSpan("test-span") + ) + ctx = opentracing.ContextWithSpan(ctx, span) + + task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) + if err != nil { + t.Fatal(err) + } + + if err := tes.i.DeleteTask(ctx, task.ID); err != nil { + t.Fatal(err) + } + + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) + if err == nil { + t.Fatal("failed to error on promise create") + } + + if promise != nil { + t.Fatalf("expected no promise but recieved one: %+v", promise) + } + + runs, _, err := tes.i.FindRuns(context.Background(), influxdb.RunFilter{Task: task.ID}) + if err != nil { + t.Fatal(err) + } + + if len(runs) != 1 { + t.Fatalf("expected 1 runs on failed promise: got: %d, %#v", len(runs), runs[0]) + } + + if runs[0].Status != "failed" { + t.Fatal("failed to set failed state") + } + +} + type taskControlService struct { backend.TaskControlService