diff --git a/task/backend/inmem_logreaderwriter.go b/task/backend/inmem_logreaderwriter.go index 7525753cc9..026c2ffda0 100644 --- a/task/backend/inmem_logreaderwriter.go +++ b/task/backend/inmem_logreaderwriter.go @@ -23,14 +23,12 @@ func NewInMemRunReaderWriter() *runReaderWriter { return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}} } -func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, status RunStatus) error { +func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error { r.mu.Lock() defer r.mu.Unlock() timeSetter := func(r *platform.Run) { whenStr := when.UTC().Format(time.RFC3339) switch status { - case RunScheduled: - r.ScheduledFor = whenStr case RunStarted: r.StartedAt = whenStr case RunFail, RunSuccess, RunCanceled: @@ -38,13 +36,16 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, r } } - existingRun, ok := r.byRunID[runID.String()] + ridStr := rlb.RunID.String() + existingRun, ok := r.byRunID[ridStr] if !ok { - tid := append([]byte(nil), task.ID...) - run := &platform.Run{ID: runID, TaskID: tid, Status: status.String()} + tid := append([]byte(nil), rlb.Task.ID...) + sf := time.Unix(rlb.RunScheduledFor, 0).UTC() + run := &platform.Run{ID: rlb.RunID, TaskID: tid, Status: status.String(), ScheduledFor: sf.Format(time.RFC3339)} timeSetter(run) - r.byRunID[runID.String()] = run - r.byTaskID[task.ID.String()] = append(r.byTaskID[task.ID.String()], run) + r.byRunID[ridStr] = run + tidStr := rlb.Task.ID.String() + r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run) return nil } @@ -53,19 +54,21 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, r return nil } -func (r *runReaderWriter) AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error { +func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error { r.mu.Lock() defer r.mu.Unlock() log = fmt.Sprintf("%s: %s", when.Format(time.RFC3339), log) - existingRun, ok := r.byRunID[runID.String()] + ridStr := rlb.RunID.String() + existingRun, ok := r.byRunID[ridStr] if !ok { return ErrRunNotFound } + sep := "" if existingRun.Log != "" { - existingRun.Log = existingRun.Log + "\n" + sep = "\n" } - existingRun.Log = platform.Log(string(existingRun.Log) + log) + existingRun.Log = platform.Log(string(existingRun.Log) + sep + log) return nil } diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index a11b711529..b8ef5d302d 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -487,11 +487,17 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger runLogger.Warn("Unhandled run state", zap.Stringer("state", s)) } + rlb := RunLogBase{ + Task: r.task, + RunID: qr.RunID, + RunScheduledFor: qr.Now, + } + // Arbitrarily chosen short time limit for how fast the log write must complete. // If we start seeing errors from this, we know the time limit is too short or the system is overloaded. ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond) defer cancel() - if err := r.logWriter.UpdateRunState(ctx, r.task, qr.RunID, time.Now(), s); err != nil { + if err := r.logWriter.UpdateRunState(ctx, rlb, time.Now(), s); err != nil { runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err)) } } diff --git a/task/backend/store.go b/task/backend/store.go index fa01e56b58..43f97ed3c9 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -37,8 +37,7 @@ const ( type RunStatus int const ( - RunScheduled RunStatus = iota - RunStarted + RunStarted RunStatus = iota RunSuccess RunFail RunCanceled @@ -46,8 +45,6 @@ const ( func (r RunStatus) String() string { switch r { - case RunScheduled: - return "scheduled" case RunStarted: return "started" case RunSuccess: @@ -141,24 +138,36 @@ type Store interface { Close() error } +// RunLogBase is the base information for a logs about an individual run. +type RunLogBase struct { + // The parent task that owns the run. + Task *StoreTask + + // The ID of the run. + RunID platform.ID + + // The Unix timestamp indicating the run's scheduled time. + RunScheduledFor int64 +} + // LogWriter writes task logs and task state changes to a store. type LogWriter interface { // UpdateRunState sets the run state and the respective time. - UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error + UpdateRunState(ctx context.Context, base RunLogBase, when time.Time, state RunStatus) error // AddRunLog adds a log line to the run. - AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error + AddRunLog(ctx context.Context, base RunLogBase, when time.Time, log string) error } // NopLogWriter is a LogWriter that doesn't do anything when its methods are called. // This is useful for test, but not much else. type NopLogWriter struct{} -func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error { +func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error { return nil } -func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error { +func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) error { return nil } diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index c3d0fe6064..3beb28605c 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -17,7 +17,7 @@ type DestroyRunStoreFunc func(*testing.T, backend.LogWriter, backend.LogReader) func NewRunStoreTest(name string, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) func(*testing.T) { return func(t *testing.T) { t.Run(name, func(t *testing.T) { - t.Run("SetRunScheduled", func(t *testing.T) { + t.Run("UpdateRunState", func(t *testing.T) { updateRunState(t, crf, drf) }) t.Run("RunLog", func(t *testing.T) { @@ -48,14 +48,21 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun run := platform.Run{ ID: platform.ID([]byte("run")), TaskID: task.ID, - Status: "scheduled", + Status: "started", ScheduledFor: scheduledFor.Format(time.RFC3339), } + rlb := backend.RunLogBase{ + Task: task, + RunID: run.ID, + RunScheduledFor: 1, + } - err := writer.UpdateRunState(context.Background(), task, run.ID, scheduledFor, backend.RunScheduled) - if err != nil { + startAt := time.Unix(2, 0).UTC() + if err := writer.UpdateRunState(context.Background(), rlb, startAt, backend.RunStarted); err != nil { t.Fatal(err) } + run.StartedAt = startAt.Format(time.RFC3339Nano) + run.Status = "started" returnedRun, err := reader.FindRunByID(context.Background(), task.Org, task.ID, run.ID) if err != nil { @@ -66,24 +73,8 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun t.Fatalf("expected: %+v, got: %+v", run, returnedRun) } - startAt := time.Unix(2, 0).UTC() - if err := writer.UpdateRunState(context.Background(), task, run.ID, startAt, backend.RunStarted); err != nil { - t.Fatal(err) - } - run.StartedAt = startAt.Format(time.RFC3339Nano) - run.Status = "started" - - returnedRun, err = reader.FindRunByID(context.Background(), task.Org, task.ID, run.ID) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(run, *returnedRun) { - t.Fatalf("expected: %+v, got: %+v", run, returnedRun) - } - endAt := time.Unix(3, 0).UTC() - if err := writer.UpdateRunState(context.Background(), task, run.ID, endAt, backend.RunSuccess); err != nil { + if err := writer.UpdateRunState(context.Background(), rlb, endAt, backend.RunSuccess); err != nil { t.Fatal(err) } run.FinishedAt = endAt.Format(time.RFC3339Nano) @@ -108,32 +99,40 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) { Org: platform.ID([]byte("ab01ab01ab01ab05")), } + sf := time.Now().UTC() + sa := sf.Add(time.Second) run := platform.Run{ ID: platform.ID([]byte("run")), TaskID: task.ID, - Status: "scheduled", - ScheduledFor: time.Now().UTC().Format(time.RFC3339), + Status: "started", + ScheduledFor: sf.Format(time.RFC3339), + StartedAt: sa.Format(time.RFC3339), + } + rlb := backend.RunLogBase{ + Task: task, + RunID: run.ID, + RunScheduledFor: sf.Unix(), } logTime := time.Now().UTC() - if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "bad"); err == nil { + if err := writer.AddRunLog(context.Background(), rlb, logTime, "bad"); err == nil { t.Fatal("shouldn't be able to log against non existing run") } - err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunScheduled) + err := writer.UpdateRunState(context.Background(), rlb, sa, backend.RunStarted) if err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "first"); err != nil { + if err := writer.AddRunLog(context.Background(), rlb, logTime, "first"); err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "second"); err != nil { + if err := writer.AddRunLog(context.Background(), rlb, logTime, "second"); err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "third"); err != nil { + if err := writer.AddRunLog(context.Background(), rlb, logTime, "third"); err != nil { t.Fatal(err) } @@ -167,11 +166,16 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) scheduledFor := time.Unix(int64(i), 0).UTC() runs[i] = platform.Run{ ID: platform.ID([]byte(fmt.Sprintf("run%d", i))), - Status: "scheduled", + Status: "started", ScheduledFor: scheduledFor.Format(time.RFC3339), } + rlb := backend.RunLogBase{ + Task: task, + RunID: runs[i].ID, + RunScheduledFor: scheduledFor.Unix(), + } - err := writer.UpdateRunState(context.Background(), task, runs[i].ID, scheduledFor, backend.RunScheduled) + err := writer.UpdateRunState(context.Background(), rlb, scheduledFor.Add(time.Second), backend.RunStarted) if err != nil { t.Fatal(err) } @@ -256,14 +260,23 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu ID: platform.ID([]byte("ab01ab01ab01ab01")), Org: platform.ID([]byte("ab01ab01ab01ab05")), } + sf := time.Now().UTC() + sa := sf.Add(time.Second) + run := platform.Run{ ID: platform.ID([]byte("run")), TaskID: task.ID, - Status: "scheduled", - ScheduledFor: time.Now().UTC().Format(time.RFC3339), + Status: "started", + ScheduledFor: sf.Format(time.RFC3339), + StartedAt: sa.Format(time.RFC3339), + } + rlb := backend.RunLogBase{ + Task: task, + RunID: run.ID, + RunScheduledFor: sf.Unix(), } - if err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunScheduled); err != nil { + if err := writer.UpdateRunState(context.Background(), rlb, sa, backend.RunStarted); err != nil { t.Fatal(err) } @@ -306,18 +319,24 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) runs := make([]platform.Run, 20) for i := 0; i < len(runs); i++ { + sf := time.Unix(int64(i), 0) runs[i] = platform.Run{ ID: platform.ID([]byte(fmt.Sprintf("run%d", i))), Status: "started", - ScheduledFor: time.Unix(int64(i), 0).Format(time.RFC3339), + ScheduledFor: sf.UTC().Format(time.RFC3339), + } + rlb := backend.RunLogBase{ + Task: task, + RunID: runs[i].ID, + RunScheduledFor: sf.Unix(), } - err := writer.UpdateRunState(context.Background(), task, runs[i].ID, time.Now(), backend.RunScheduled) + err := writer.UpdateRunState(context.Background(), rlb, time.Now(), backend.RunStarted) if err != nil { t.Fatal(err) } - writer.AddRunLog(context.Background(), task, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) + writer.AddRunLog(context.Background(), rlb, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) } logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID}) diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 70f85da2fb..8805a4c856 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -206,7 +206,12 @@ func testTaskRuns(t *testing.T, sys *System) { t.Fatal(err) } startedAt := time.Now() - if err := sys.LW.UpdateRunState(sys.Ctx, st, runID, startedAt, backend.RunStarted); err != nil { + rlb := backend.RunLogBase{ + Task: st, + RunID: runID, + RunScheduledFor: rc.Created.Now, + } + if err := sys.LW.UpdateRunState(sys.Ctx, rlb, startedAt, backend.RunStarted); err != nil { t.Fatal(err) } @@ -224,24 +229,23 @@ func testTaskRuns(t *testing.T, sys *System) { r := runs[0] if !bytes.Equal(r.ID, runID) { - t.Fatalf("expected to find run with ID %s, got %s", runID.String(), r.ID.String()) + t.Errorf("expected to find run with ID %s, got %s", runID.String(), r.ID.String()) } if !bytes.Equal(r.TaskID, task.ID) { - t.Fatalf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String()) + t.Errorf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String()) } if want := startedAt.UTC().Format(time.RFC3339); r.StartedAt != want { - t.Fatalf("expected run to be started at %q, got %q", want, r.StartedAt) + t.Errorf("expected run to be started at %q, got %q", want, r.StartedAt) } if want := time.Unix(rc.Created.Now, 0).UTC().Format(time.RFC3339); r.ScheduledFor != want { - // Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753. - t.Logf("TODO(#753): expected run to be scheduled for %q, got %q", want, r.ScheduledFor) + t.Errorf("expected run to be scheduled for %q, got %q", want, r.ScheduledFor) } if want := time.Unix(requestedAtUnix, 0).UTC().Format(time.RFC3339); r.RequestedAt != want { // Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753. t.Logf("TODO(#753): expected run to be requested at %q, got %q", want, r.RequestedAt) } if r.FinishedAt != "" { - t.Fatalf("expected run not be finished, got %q", r.FinishedAt) + t.Errorf("expected run not be finished, got %q", r.FinishedAt) } }