diff --git a/cmd/influx/task.go b/cmd/influx/task.go index 58e48ce82a..e0e5525d62 100644 --- a/cmd/influx/task.go +++ b/cmd/influx/task.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "github.com/influxdata/flux/repl" "github.com/influxdata/platform" @@ -573,9 +574,8 @@ func taskRunFindF(cmd *cobra.Command, args []string) { w.Flush() } -// RunRetryFlags define the Delete command type RunRetryFlags struct { - id string + taskID, runID string } var runRetryFlags RunRetryFlags @@ -587,8 +587,10 @@ func init() { Run: runRetryF, } - cmd.Flags().StringVarP(&runRetryFlags.id, "id", "i", "", "task id (required)") - cmd.MarkFlagRequired("id") + cmd.Flags().StringVarP(&runRetryFlags.taskID, "task-id", "i", "", "task id (required)") + cmd.Flags().StringVarP(&runRetryFlags.runID, "run-id", "r", "", "run id (required)") + cmd.MarkFlagRequired("task-id") + cmd.MarkFlagRequired("run-id") taskCmd.AddCommand(cmd) } @@ -599,38 +601,21 @@ func runRetryF(cmd *cobra.Command, args []string) { Token: flags.token, } - var id platform.ID - err := id.DecodeFromString(runRetryFlags.id) - if err != nil { + var taskID, runID platform.ID + if err := taskID.DecodeFromString(runRetryFlags.taskID); err != nil { + fmt.Println(err) + os.Exit(1) + } + if err := runID.DecodeFromString(runRetryFlags.runID); err != nil { fmt.Println(err) os.Exit(1) } ctx := context.TODO() - r, err := s.RetryRun(ctx, id) - if err != nil { + if err := s.RetryRun(ctx, taskID, runID, time.Now().Unix()); err != nil { fmt.Println(err) os.Exit(1) } - w := internal.NewTabWriter(os.Stdout) - w.WriteHeaders( - "ID", - "TaskID", - "Status", - "ScheduledFor", - "StartedAt", - "FinishedAt", - "RequestedAt", - ) - w.Write(map[string]interface{}{ - "ID": r.ID, - "TaskID": r.TaskID, - "Status": r.Status, - "ScheduledFor": r.ScheduledFor, - "StartedAt": r.StartedAt, - "FinishedAt": r.FinishedAt, - "RequestedAt": r.RequestedAt, - }) - w.Flush() + fmt.Printf("Retry for task %s's run %s queued.\n", taskID, runID) } diff --git a/http/swagger.yml b/http/swagger.yml index 0669a633d3..60c38569ba 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -2627,12 +2627,8 @@ paths: required: true description: run ID responses: - '200': - description: The newly created retry run - content: - application/json: - schema: - $ref: "#/components/schemas/Run" + '204': + description: retry has been queued default: description: unexpected error content: diff --git a/http/task_service.go b/http/task_service.go index 2eed165c17..6e829555b6 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -10,6 +10,7 @@ import ( "net/url" "path" "strconv" + "time" "github.com/influxdata/platform" pcontext "github.com/influxdata/platform/context" @@ -534,13 +535,13 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request, orgs platform.Or func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - req, err := decodeGetRunRequest(ctx, r, h.OrganizationService) + req, err := decodeGetRunRequest(ctx, r) if err != nil { EncodeError(ctx, err, w) return } - run, err := h.TaskService.FindRunByID(ctx, req.OrgID, req.RunID) + run, err := h.TaskService.FindRunByID(ctx, req.TaskID, req.RunID) if err != nil { EncodeError(ctx, err, w) return @@ -553,36 +554,32 @@ func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) { } type getRunRequest struct { - OrgID platform.ID - RunID platform.ID + TaskID platform.ID + RunID platform.ID } -func decodeGetRunRequest(ctx context.Context, r *http.Request, orgs platform.OrganizationService) (*getRunRequest, error) { +func decodeGetRunRequest(ctx context.Context, r *http.Request) (*getRunRequest, error) { params := httprouter.ParamsFromContext(ctx) - id := params.ByName("rid") - if id == "" { + tid := params.ByName("tid") + if tid == "" { + return nil, kerrors.InvalidDataf("you must provide a task ID") + } + rid := params.ByName("rid") + if rid == "" { return nil, kerrors.InvalidDataf("you must provide a run ID") } - qp := r.URL.Query() - var orgID platform.ID - if orgName := qp.Get("org"); orgName != "" { - o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName}) - if err != nil { - return nil, err - } - - orgID = o.ID + var ti, ri platform.ID + if err := ti.DecodeFromString(tid); err != nil { + return nil, err } - - var i platform.ID - if err := i.DecodeFromString(id); err != nil { + if err := ri.DecodeFromString(rid); err != nil { return nil, err } return &getRunRequest{ - RunID: i, - OrgID: orgID, + RunID: ri, + TaskID: ti, }, nil } @@ -641,37 +638,56 @@ func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) { EncodeError(ctx, err, w) return } + if req.RequestedAt == nil { + now := time.Now().Unix() + req.RequestedAt = &now + } - run, err := h.TaskService.RetryRun(ctx, req.RunID) - if err != nil { + if err := h.TaskService.RetryRun(ctx, req.TaskID, req.RunID, *req.RequestedAt); err != nil { EncodeError(ctx, err, w) return } - if err := encodeResponse(ctx, w, http.StatusOK, run); err != nil { - EncodeError(ctx, err, w) - return - } + w.WriteHeader(http.StatusNoContent) } type retryRunRequest struct { - RunID platform.ID + RunID, TaskID platform.ID + RequestedAt *int64 } func decodeRetryRunRequest(ctx context.Context, r *http.Request) (*retryRunRequest, error) { params := httprouter.ParamsFromContext(ctx) - id := params.ByName("rid") - if id == "" { + tid := params.ByName("tid") + if tid == "" { + return nil, kerrors.InvalidDataf("you must provide a task ID") + } + rid := params.ByName("rid") + if rid == "" { return nil, kerrors.InvalidDataf("you must provide a run ID") } - var i platform.ID - if err := i.DecodeFromString(id); err != nil { + var ti, ri platform.ID + if err := ti.DecodeFromString(tid); err != nil { + return nil, err + } + if err := ri.DecodeFromString(rid); err != nil { return nil, err } + var t *int64 + if ra := r.URL.Query().Get("requestedAt"); ra != "" { + tu, err := strconv.ParseInt(ra, 10, 64) + if err != nil { + return nil, err + } + t = &tu + } + return &retryRunRequest{ - RunID: i, + RunID: ri, + TaskID: ti, + RequestedAt: t, }, nil } @@ -927,13 +943,87 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([ } // FindRunByID returns a single run of a specific task. -func (t TaskService) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) { - return nil, errors.New("not yet implemented") +func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) { + u, err := newURL(t.Addr, taskIDRunIDPath(taskID, runID)) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + + SetToken(t.Token, req) + + hc := newClient(u.Scheme, t.InsecureSkipVerify) + + resp, err := hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + if err.Error() == backend.ErrRunNotFound.Error() { + // ErrRunNotFound is expected as part of the FindRunByID contract, + // so return that actual error instead of a different error that looks like it. + return nil, backend.ErrRunNotFound + } + + return nil, err + } + + var r runResponse + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + return nil, err + } + return &r.Run, nil } // RetryRun creates and returns a new run (which is a retry of another run). -func (t TaskService) RetryRun(ctx context.Context, id platform.ID) (*platform.Run, error) { - return nil, errors.New("not yet implemented") +func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID, requestedAt int64) error { + p := path.Join(taskIDRunIDPath(taskID, runID), "retry") + u, err := newURL(t.Addr, p) + if err != nil { + return err + } + + val := url.Values{} + val.Set("requestedAt", strconv.FormatInt(requestedAt, 10)) + u.RawQuery = val.Encode() + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return err + } + + SetToken(t.Token, req) + + hc := newClient(u.Scheme, t.InsecureSkipVerify) + + resp, err := hc.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + if err.Error() == backend.ErrRunNotFound.Error() { + // ErrRunNotFound is expected as part of the RetryRun contract, + // so return that actual error instead of a different error that looks like it. + return backend.ErrRunNotFound + } + + // RetryAlreadyQueuedError is also part of the contract. + if e := backend.ParseRetryAlreadyQueuedError(err.Error()); e != nil { + return *e + } + + return err + } + + return nil } func cancelPath(taskID, runID platform.ID) string { @@ -975,3 +1065,7 @@ func taskIDPath(id platform.ID) string { func taskIDRunsPath(id platform.ID) string { return path.Join(tasksPath, id.String(), "runs") } + +func taskIDRunIDPath(taskID, runID platform.ID) string { + return path.Join(tasksPath, taskID.String(), "runs", runID.String()) +} diff --git a/task.go b/task.go index c668dfb005..9400d1ed7d 100644 --- a/task.go +++ b/task.go @@ -54,13 +54,14 @@ type TaskService interface { FindRuns(ctx context.Context, filter RunFilter) ([]*Run, int, error) // FindRunByID returns a single run. - FindRunByID(ctx context.Context, orgID, runID ID) (*Run, error) + FindRunByID(ctx context.Context, taskID, runID ID) (*Run, error) // CancelRun cancels a currently running run. CancelRun(ctx context.Context, taskID, runID ID) error // RetryRun creates and returns a new run (which is a retry of another run). - RetryRun(ctx context.Context, id ID) (*Run, error) + // The requestedAt parameter is the Unix timestamp that will be recorded for the retry. + RetryRun(ctx context.Context, taskID, runID ID, requestedAt int64) error } // TaskUpdate represents updates to a task diff --git a/task/backend/error_test.go b/task/backend/error_test.go new file mode 100644 index 0000000000..9283be7d8d --- /dev/null +++ b/task/backend/error_test.go @@ -0,0 +1,16 @@ +package backend_test + +import ( + "testing" + + "github.com/influxdata/platform/task/backend" +) + +func TestParseRetryAlreadyQueuedError(t *testing.T) { + e := backend.RetryAlreadyQueuedError{Start: 1000, End: 2000} + validMsg := e.Error() + + if err := backend.ParseRetryAlreadyQueuedError(validMsg); err == nil || *err != e { + t.Fatalf("%q should have parsed to %v, but got %v", validMsg, e, err) + } +} diff --git a/task/backend/inmem_logreaderwriter.go b/task/backend/inmem_logreaderwriter.go index ea0d9a8fc3..dbeff0c4d5 100644 --- a/task/backend/inmem_logreaderwriter.go +++ b/task/backend/inmem_logreaderwriter.go @@ -11,8 +11,6 @@ import ( "github.com/influxdata/platform" ) -var ErrRunNotFound error = errors.New("run not found") - type runReaderWriter struct { mu sync.RWMutex byTaskID map[string][]*platform.Run diff --git a/task/backend/meta.go b/task/backend/meta.go index 2405c05463..1b35a422b6 100644 --- a/task/backend/meta.go +++ b/task/backend/meta.go @@ -195,6 +195,11 @@ func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64) er // Don't roll over in pathological case of starting at minimum int64. lc = start } + for _, mr := range stm.ManualRuns { + if mr.Start == start && mr.End == end { + return RetryAlreadyQueuedError{Start: start, End: end} + } + } run := &StoreTaskMetaManualRun{ Start: start, End: end, diff --git a/task/backend/meta_test.go b/task/backend/meta_test.go index 27b234e84e..1bd4ed6cfd 100644 --- a/task/backend/meta_test.go +++ b/task/backend/meta_test.go @@ -257,4 +257,25 @@ func TestMeta_ManuallyRunTimeRange(t *testing.T) { if len(stm.ManualRuns) != maxQueueSize { t.Fatalf("expected to be unable to exceed queue size of %d; got %d", maxQueueSize, len(stm.ManualRuns)) } + + // Reset manual runs. + stm.ManualRuns = stm.ManualRuns[:0] + + // Duplicate manual run with single timestamp should be rejected. + if err := stm.ManuallyRunTimeRange(1, 1, 2); err != nil { + t.Fatal(err) + } + if exp, err := (backend.RetryAlreadyQueuedError{Start: 1, End: 1}), stm.ManuallyRunTimeRange(1, 1, 3); err != exp { + t.Fatalf("expected %v, got %v", exp, err) + } + + // Duplicate manual run with time range should be rejected. + if err := stm.ManuallyRunTimeRange(100, 200, 201); err != nil { + t.Fatal(err) + } + if exp, err := (backend.RetryAlreadyQueuedError{Start: 100, End: 200}), stm.ManuallyRunTimeRange(100, 200, 202); err != exp { + t.Fatalf("expected %v, got %v", exp, err) + } + + // Not currently enforcing one way or another when a newly requested time range overlaps with an existing one. } diff --git a/task/backend/store.go b/task/backend/store.go index 056e37599e..e74201d99e 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -27,6 +27,12 @@ var ( // ErrManualQueueFull is returned when a manual run request cannot be completed. ErrManualQueueFull = errors.New("manual queue at capacity") + + // ErrRunNotFound is returned when searching for a run that doesn't exist. + ErrRunNotFound = errors.New("run not found") + + // ErrRunNotFinished is returned when a retry is invalid due to the run not being finished yet. + ErrRunNotFinished = errors.New("run is still in progress") ) type TaskStatus string @@ -84,6 +90,43 @@ func (e RunNotYetDueError) Error() string { return "run not due until " + time.Unix(e.DueAt, 0).UTC().Format(time.RFC3339) } +// RetryAlreadyQueuedError is returned when attempting to retry a run which has not yet completed. +type RetryAlreadyQueuedError struct { + // Unix timestamps matching existing request's start and end. + Start, End int64 +} + +const fmtRetryAlreadyQueued = "previous retry for start=%s end=%s has not yet finished" + +func (e RetryAlreadyQueuedError) Error() string { + return fmt.Sprintf(fmtRetryAlreadyQueued, + time.Unix(e.Start, 0).UTC().Format(time.RFC3339), + time.Unix(e.End, 0).UTC().Format(time.RFC3339), + ) +} + +// ParseRetryAlreadyQueuedError attempts to parse a RetryAlreadyQueuedError from msg. +// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil. +func ParseRetryAlreadyQueuedError(msg string) *RetryAlreadyQueuedError { + var s, e string + n, err := fmt.Sscanf(msg, fmtRetryAlreadyQueued, &s, &e) + if err != nil || n != 2 { + return nil + } + + start, err := time.Parse(time.RFC3339, s) + if err != nil { + return nil + } + + end, err := time.Parse(time.RFC3339, e) + if err != nil { + return nil + } + + return &RetryAlreadyQueuedError{Start: start.Unix(), End: end.Unix()} +} + // RunCreation is returned by CreateNextRun. type RunCreation struct { Created QueuedRun @@ -233,6 +276,7 @@ type LogReader interface { ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) // FindRunByID finds a run given a orgID and runID. + // orgID is necessary to look in the correct system bucket. FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) // ListLogs lists logs for a task or a specified run of a task. diff --git a/task/platform_adapter.go b/task/platform_adapter.go index 947dd3a1c0..df6a2d9c6e 100644 --- a/task/platform_adapter.go +++ b/task/platform_adapter.go @@ -154,12 +154,35 @@ func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*p return runs, len(runs), err } -func (p pAdapter) FindRunByID(ctx context.Context, orgID, id platform.ID) (*platform.Run, error) { - return p.r.FindRunByID(ctx, orgID, id) +func (p pAdapter) FindRunByID(ctx context.Context, taskID, id platform.ID) (*platform.Run, error) { + task, err := p.s.FindTaskByID(ctx, taskID) + if err != nil { + return nil, err + } + return p.r.FindRunByID(ctx, task.Org, id) } -func (p pAdapter) RetryRun(ctx context.Context, id platform.ID) (*platform.Run, error) { - return nil, errors.New("not yet implemented") +func (p pAdapter) RetryRun(ctx context.Context, taskID, id platform.ID, requestedAt int64) error { + task, err := p.s.FindTaskByID(ctx, taskID) + if err != nil { + return err + } + + run, err := p.r.FindRunByID(ctx, task.Org, id) + if err != nil { + return err + } + if run.Status == backend.RunStarted.String() { + return backend.ErrRunNotFinished + } + + scheduledTime, err := time.Parse(time.RFC3339, run.ScheduledFor) + if err != nil { + return err + } + t := scheduledTime.UTC().Unix() + + return p.s.ManuallyRunTimeRange(ctx, run.TaskID, t, t, requestedAt) } func (p pAdapter) CancelRun(ctx context.Context, taskID, runID platform.ID) error { diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index cc1919e41a..9df4ad7d3f 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/influxdata/platform" "github.com/influxdata/platform/snowflake" "github.com/influxdata/platform/task" @@ -93,6 +94,8 @@ type System struct { // It is safe if this returns the same values every time it is called. CredsFunc func() (orgID, userID platform.ID, token string, err error) + // Underlying task service, initialized inside TestTaskService, + // either by instantiating a PlatformAdapter directly or by calling TaskServiceFunc. ts platform.TaskService } @@ -219,73 +222,195 @@ func testTaskCRUD(t *testing.T, sys *System) { func testTaskRuns(t *testing.T, sys *System) { orgID, userID, _ := creds(t, sys) - task := &platform.Task{Organization: orgID, Owner: platform.User{ID: userID}, Flux: fmt.Sprintf(scriptFmt, 0)} - if err := sys.ts.CreateTask(sys.Ctx, task); err != nil { - t.Fatal(err) - } + t.Run("FindRuns and FindRunByID", func(t *testing.T) { + t.Parallel() - const requestedAtUnix = 1000 - if err := sys.S.ManuallyRunTimeRange(sys.Ctx, task.ID, 60, 300, requestedAtUnix); err != nil { - t.Fatal(err) - } + // Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now", + // which makes timing of runs somewhat difficult. + task := &platform.Task{Organization: orgID, Owner: platform.User{ID: userID}, Flux: fmt.Sprintf(scriptFmt, 0)} + if err := sys.ts.CreateTask(sys.Ctx, task); err != nil { + t.Fatal(err) + } + st, err := sys.S.FindTaskByID(sys.Ctx, task.ID) + if err != nil { + t.Fatal(err) + } - // Create a run. - rc, err := sys.S.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix+1) - if err != nil { - t.Fatal(err) - } - if rc.Created.TaskID != task.ID { - t.Fatalf("unexpected created run: got %s, want %s", rc.Created.TaskID.String(), task.ID.String()) - } - runID := rc.Created.RunID + delta := (2 * time.Minute) + time.Second + requestedAtUnix := time.Now().Add(delta).UTC().Unix() // This should guarantee we can make two runs. - // Set the run state to started. - st, err := sys.S.FindTaskByID(sys.Ctx, task.ID) - if err != nil { - t.Fatal(err) - } - startedAt := time.Now() - rlb := backend.RunLogBase{ - Task: st, - RunID: runID, - RunScheduledFor: rc.Created.Now, - RequestedAt: requestedAtUnix, - } - if err := sys.LW.UpdateRunState(sys.Ctx, rlb, startedAt, backend.RunStarted); err != nil { - t.Fatal(err) - } + rc0, err := sys.S.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix) + if err != nil { + t.Fatal(err) + } + if rc0.Created.TaskID != task.ID { + t.Fatalf("wrong task ID on created task: got %s, want %s", rc0.Created.TaskID, task.ID) + } - // Find runs, to see the started run. - runs, n, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &orgID, Task: &task.ID}) - if err != nil { - t.Fatal(err) - } - if n != len(runs) { - t.Fatalf("expected n=%d, got %d", len(runs), n) - } - if len(runs) != 1 { - t.Fatalf("expected 1 run returned, got %d", len(runs)) - } + startedAt := time.Now().UTC() - r := runs[0] - if r.ID != runID { - t.Errorf("expected to find run with ID %s, got %s", runID.String(), r.ID.String()) - } - if r.TaskID != task.ID { - t.Errorf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String()) - } - if want := startedAt.UTC().Format(time.RFC3339); r.StartedAt != want { - t.Errorf("expected run to be started at %q, got %q", want, r.StartedAt) - } - if want := time.Unix(rc.Created.Now, 0).UTC().Format(time.RFC3339); r.ScheduledFor != want { - t.Errorf("expected run to be scheduled for %q, got %q", want, r.ScheduledFor) - } - if want := time.Unix(requestedAtUnix, 0).UTC().Format(time.RFC3339); r.RequestedAt != want { - t.Errorf("expected run to be requested at %q, got %q", want, r.RequestedAt) - } - if r.FinishedAt != "" { - t.Errorf("expected run not be finished, got %q", r.FinishedAt) - } + // Update the run state to Started; normally the scheduler would do this. + rlb0 := backend.RunLogBase{ + Task: st, + RunID: rc0.Created.RunID, + RunScheduledFor: rc0.Created.Now, + RequestedAt: requestedAtUnix, + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb0, startedAt, backend.RunStarted); err != nil { + t.Fatal(err) + } + + rc1, err := sys.S.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix) + if err != nil { + t.Fatal(err) + } + if rc1.Created.TaskID != task.ID { + t.Fatalf("wrong task ID on created task: got %s, want %s", rc1.Created.TaskID, task.ID) + } + + // Update the run state to Started; normally the scheduler would do this. + rlb1 := backend.RunLogBase{ + Task: st, + RunID: rc1.Created.RunID, + RunScheduledFor: rc1.Created.Now, + RequestedAt: requestedAtUnix, + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb1, startedAt, backend.RunStarted); err != nil { + t.Fatal(err) + } + // Mark the second run finished. + if err := sys.S.FinishRun(sys.Ctx, task.ID, rlb1.RunID); err != nil { + t.Fatal(err) + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb1, startedAt.Add(time.Second), backend.RunSuccess); err != nil { + t.Fatal(err) + } + + runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &orgID, Task: &task.ID}) + if err != nil { + t.Fatal(err) + } + if len(runs) != 2 { + t.Fatalf("expected 2 runs, got %v", runs) + } + if runs[0].ID != rc0.Created.RunID { + t.Fatalf("retrieved wrong run ID; want %s, got %s", rc0.Created.RunID, runs[0].ID) + } + if exp := startedAt.Format(time.RFC3339); runs[0].StartedAt != exp { + t.Fatalf("unexpectedStartedAt; want %s, got %s", exp, runs[0].StartedAt) + } + if runs[0].Status != backend.RunStarted.String() { + t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status) + } + if runs[0].FinishedAt != "" { + t.Fatalf("expected empty FinishedAt, got %q", runs[0].FinishedAt) + } + + if runs[1].ID != rc1.Created.RunID { + t.Fatalf("retrieved wrong run ID; want %s, got %s", rc1.Created.RunID, runs[1].ID) + } + if runs[1].StartedAt != runs[0].StartedAt { + t.Fatalf("unexpected StartedAt; want %s, got %s", runs[0].StartedAt, runs[1].StartedAt) + } + if runs[1].Status != backend.RunSuccess.String() { + t.Fatalf("unexpected run status; want %s, got %s", backend.RunSuccess.String(), runs[0].Status) + } + if exp := startedAt.Add(time.Second).Format(time.RFC3339); runs[1].FinishedAt != exp { + t.Fatalf("unexpected FinishedAt; want %s, got %s", exp, runs[1].FinishedAt) + } + + foundRun0, err := sys.ts.FindRunByID(sys.Ctx, task.ID, runs[0].ID) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(foundRun0, runs[0]); diff != "" { + t.Fatalf("difference between listed run and found run: %s", diff) + } + + foundRun1, err := sys.ts.FindRunByID(sys.Ctx, task.ID, runs[1].ID) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(foundRun1, runs[1]); diff != "" { + t.Fatalf("difference between listed run and found run: %s", diff) + } + }) + + t.Run("RetryRun", func(t *testing.T) { + t.Parallel() + + // Script is set to run every minute. The platform adapter is currently hardcoded to schedule after "now", + // which makes timing of runs somewhat difficult. + task := &platform.Task{Organization: orgID, Owner: platform.User{ID: userID}, Flux: fmt.Sprintf(scriptFmt, 0)} + if err := sys.ts.CreateTask(sys.Ctx, task); err != nil { + t.Fatal(err) + } + st, err := sys.S.FindTaskByID(sys.Ctx, task.ID) + if err != nil { + t.Fatal(err) + } + + // Non-existent ID should return the right error. + if err := sys.ts.RetryRun(sys.Ctx, task.ID, platform.ID(math.MaxUint64), 0); err != backend.ErrRunNotFound { + t.Errorf("expected retrying run that doesn't exist to return %v, got %v", backend.ErrRunNotFound, err) + } + + delta := time.Minute + (2 * time.Second) + requestedAtUnix := time.Now().Add(delta).UTC().Unix() // This should guarantee we can make a run. + + rc, err := sys.S.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix) + if err != nil { + t.Fatal(err) + } + if rc.Created.TaskID != task.ID { + t.Fatalf("wrong task ID on created task: got %s, want %s", rc.Created.TaskID, task.ID) + } + + startedAt := time.Now().UTC() + + // Update the run state to Started then Failed; normally the scheduler would do this. + rlb := backend.RunLogBase{ + Task: st, + RunID: rc.Created.RunID, + RunScheduledFor: rc.Created.Now, + RequestedAt: requestedAtUnix, + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb, startedAt, backend.RunStarted); err != nil { + t.Fatal(err) + } + if err := sys.S.FinishRun(sys.Ctx, task.ID, rlb.RunID); err != nil { + t.Fatal(err) + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb, startedAt.Add(time.Second), backend.RunFail); err != nil { + t.Fatal(err) + } + + // Now retry the run. + if err := sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix); err != nil { + t.Fatal(err) + } + // Ensure the retry is added on the store task meta. + meta, err := sys.S.FindTaskMetaByID(sys.Ctx, task.ID) + if err != nil { + t.Fatal(err) + } + + found := false + for _, mr := range meta.ManualRuns { + if mr.Start == mr.End && mr.Start == rc.Created.Now && mr.RequestedAt == requestedAtUnix { + found = true + break + } + } + if !found { + t.Fatalf("didn't find matching manual run after successful RetryRun call; got: %v", meta.ManualRuns) + } + + // Retrying a run which has been queued but not started, should be rejected. + if exp, err := (backend.RetryAlreadyQueuedError{Start: rc.Created.Now, End: rc.Created.Now}), sys.ts.RetryRun(sys.Ctx, task.ID, rlb.RunID, requestedAtUnix); err != exp { + t.Fatalf("subsequent retry should have been rejected with %v; got %v", exp, err) + } + }) } func testTaskConcurrency(t *testing.T, sys *System) {