chore(task): clean up log writer API

Introduce a RunLogBase type that encapsulates the base information
shared across multiple logs for the same run. This has the information
previously part of the API (Task and RunID), and includes the addition
of a RunScheduledFor timestamp.

Also remove the RunScheduled value for RunStatus, as it was never used
in the scheduler.
pull/10616/head
Mark Rushakoff 2018-09-06 14:55:55 -07:00 committed by Mark Rushakoff
parent 46963152cc
commit 7bbd087980
5 changed files with 105 additions and 64 deletions

View File

@ -23,14 +23,12 @@ func NewInMemRunReaderWriter() *runReaderWriter {
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}} return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
} }
func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, status RunStatus) error { func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
timeSetter := func(r *platform.Run) { timeSetter := func(r *platform.Run) {
whenStr := when.UTC().Format(time.RFC3339) whenStr := when.UTC().Format(time.RFC3339)
switch status { switch status {
case RunScheduled:
r.ScheduledFor = whenStr
case RunStarted: case RunStarted:
r.StartedAt = whenStr r.StartedAt = whenStr
case RunFail, RunSuccess, RunCanceled: case RunFail, RunSuccess, RunCanceled:
@ -38,13 +36,16 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, r
} }
} }
existingRun, ok := r.byRunID[runID.String()] ridStr := rlb.RunID.String()
existingRun, ok := r.byRunID[ridStr]
if !ok { if !ok {
tid := append([]byte(nil), task.ID...) tid := append([]byte(nil), rlb.Task.ID...)
run := &platform.Run{ID: runID, TaskID: tid, Status: status.String()} sf := time.Unix(rlb.RunScheduledFor, 0).UTC()
run := &platform.Run{ID: rlb.RunID, TaskID: tid, Status: status.String(), ScheduledFor: sf.Format(time.RFC3339)}
timeSetter(run) timeSetter(run)
r.byRunID[runID.String()] = run r.byRunID[ridStr] = run
r.byTaskID[task.ID.String()] = append(r.byTaskID[task.ID.String()], run) tidStr := rlb.Task.ID.String()
r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run)
return nil return nil
} }
@ -53,19 +54,21 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, r
return nil return nil
} }
func (r *runReaderWriter) AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error { func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
log = fmt.Sprintf("%s: %s", when.Format(time.RFC3339), log) log = fmt.Sprintf("%s: %s", when.Format(time.RFC3339), log)
existingRun, ok := r.byRunID[runID.String()] ridStr := rlb.RunID.String()
existingRun, ok := r.byRunID[ridStr]
if !ok { if !ok {
return ErrRunNotFound return ErrRunNotFound
} }
sep := ""
if existingRun.Log != "" { if existingRun.Log != "" {
existingRun.Log = existingRun.Log + "\n" sep = "\n"
} }
existingRun.Log = platform.Log(string(existingRun.Log) + log) existingRun.Log = platform.Log(string(existingRun.Log) + sep + log)
return nil return nil
} }

View File

@ -487,11 +487,17 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger
runLogger.Warn("Unhandled run state", zap.Stringer("state", s)) runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
} }
rlb := RunLogBase{
Task: r.task,
RunID: qr.RunID,
RunScheduledFor: qr.Now,
}
// Arbitrarily chosen short time limit for how fast the log write must complete. // Arbitrarily chosen short time limit for how fast the log write must complete.
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded. // If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond) ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
defer cancel() defer cancel()
if err := r.logWriter.UpdateRunState(ctx, r.task, qr.RunID, time.Now(), s); err != nil { if err := r.logWriter.UpdateRunState(ctx, rlb, time.Now(), s); err != nil {
runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err)) runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
} }
} }

View File

@ -37,8 +37,7 @@ const (
type RunStatus int type RunStatus int
const ( const (
RunScheduled RunStatus = iota RunStarted RunStatus = iota
RunStarted
RunSuccess RunSuccess
RunFail RunFail
RunCanceled RunCanceled
@ -46,8 +45,6 @@ const (
func (r RunStatus) String() string { func (r RunStatus) String() string {
switch r { switch r {
case RunScheduled:
return "scheduled"
case RunStarted: case RunStarted:
return "started" return "started"
case RunSuccess: case RunSuccess:
@ -141,24 +138,36 @@ type Store interface {
Close() error Close() error
} }
// RunLogBase is the base information for a logs about an individual run.
type RunLogBase struct {
// The parent task that owns the run.
Task *StoreTask
// The ID of the run.
RunID platform.ID
// The Unix timestamp indicating the run's scheduled time.
RunScheduledFor int64
}
// LogWriter writes task logs and task state changes to a store. // LogWriter writes task logs and task state changes to a store.
type LogWriter interface { type LogWriter interface {
// UpdateRunState sets the run state and the respective time. // UpdateRunState sets the run state and the respective time.
UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error UpdateRunState(ctx context.Context, base RunLogBase, when time.Time, state RunStatus) error
// AddRunLog adds a log line to the run. // AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error AddRunLog(ctx context.Context, base RunLogBase, when time.Time, log string) error
} }
// NopLogWriter is a LogWriter that doesn't do anything when its methods are called. // NopLogWriter is a LogWriter that doesn't do anything when its methods are called.
// This is useful for test, but not much else. // This is useful for test, but not much else.
type NopLogWriter struct{} type NopLogWriter struct{}
func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error { func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error {
return nil return nil
} }
func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error { func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) error {
return nil return nil
} }

View File

@ -17,7 +17,7 @@ type DestroyRunStoreFunc func(*testing.T, backend.LogWriter, backend.LogReader)
func NewRunStoreTest(name string, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) func(*testing.T) { func NewRunStoreTest(name string, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) func(*testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
t.Run("SetRunScheduled", func(t *testing.T) { t.Run("UpdateRunState", func(t *testing.T) {
updateRunState(t, crf, drf) updateRunState(t, crf, drf)
}) })
t.Run("RunLog", func(t *testing.T) { t.Run("RunLog", func(t *testing.T) {
@ -48,14 +48,21 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
TaskID: task.ID, TaskID: task.ID,
Status: "scheduled", Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339), ScheduledFor: scheduledFor.Format(time.RFC3339),
} }
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: 1,
}
err := writer.UpdateRunState(context.Background(), task, run.ID, scheduledFor, backend.RunScheduled) startAt := time.Unix(2, 0).UTC()
if err != nil { if err := writer.UpdateRunState(context.Background(), rlb, startAt, backend.RunStarted); err != nil {
t.Fatal(err) t.Fatal(err)
} }
run.StartedAt = startAt.Format(time.RFC3339Nano)
run.Status = "started"
returnedRun, err := reader.FindRunByID(context.Background(), task.Org, task.ID, run.ID) returnedRun, err := reader.FindRunByID(context.Background(), task.Org, task.ID, run.ID)
if err != nil { if err != nil {
@ -66,24 +73,8 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
t.Fatalf("expected: %+v, got: %+v", run, returnedRun) t.Fatalf("expected: %+v, got: %+v", run, returnedRun)
} }
startAt := time.Unix(2, 0).UTC()
if err := writer.UpdateRunState(context.Background(), task, run.ID, startAt, backend.RunStarted); err != nil {
t.Fatal(err)
}
run.StartedAt = startAt.Format(time.RFC3339Nano)
run.Status = "started"
returnedRun, err = reader.FindRunByID(context.Background(), task.Org, task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(run, *returnedRun) {
t.Fatalf("expected: %+v, got: %+v", run, returnedRun)
}
endAt := time.Unix(3, 0).UTC() endAt := time.Unix(3, 0).UTC()
if err := writer.UpdateRunState(context.Background(), task, run.ID, endAt, backend.RunSuccess); err != nil { if err := writer.UpdateRunState(context.Background(), rlb, endAt, backend.RunSuccess); err != nil {
t.Fatal(err) t.Fatal(err)
} }
run.FinishedAt = endAt.Format(time.RFC3339Nano) run.FinishedAt = endAt.Format(time.RFC3339Nano)
@ -108,32 +99,40 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
Org: platform.ID([]byte("ab01ab01ab01ab05")), Org: platform.ID([]byte("ab01ab01ab01ab05")),
} }
sf := time.Now().UTC()
sa := sf.Add(time.Second)
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
TaskID: task.ID, TaskID: task.ID,
Status: "scheduled", Status: "started",
ScheduledFor: time.Now().UTC().Format(time.RFC3339), ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
} }
logTime := time.Now().UTC() logTime := time.Now().UTC()
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "bad"); err == nil { if err := writer.AddRunLog(context.Background(), rlb, logTime, "bad"); err == nil {
t.Fatal("shouldn't be able to log against non existing run") t.Fatal("shouldn't be able to log against non existing run")
} }
err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunScheduled) err := writer.UpdateRunState(context.Background(), rlb, sa, backend.RunStarted)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "first"); err != nil { if err := writer.AddRunLog(context.Background(), rlb, logTime, "first"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "second"); err != nil { if err := writer.AddRunLog(context.Background(), rlb, logTime, "second"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "third"); err != nil { if err := writer.AddRunLog(context.Background(), rlb, logTime, "third"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -167,11 +166,16 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
scheduledFor := time.Unix(int64(i), 0).UTC() scheduledFor := time.Unix(int64(i), 0).UTC()
runs[i] = platform.Run{ runs[i] = platform.Run{
ID: platform.ID([]byte(fmt.Sprintf("run%d", i))), ID: platform.ID([]byte(fmt.Sprintf("run%d", i))),
Status: "scheduled", Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339), ScheduledFor: scheduledFor.Format(time.RFC3339),
} }
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: scheduledFor.Unix(),
}
err := writer.UpdateRunState(context.Background(), task, runs[i].ID, scheduledFor, backend.RunScheduled) err := writer.UpdateRunState(context.Background(), rlb, scheduledFor.Add(time.Second), backend.RunStarted)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -256,14 +260,23 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu
ID: platform.ID([]byte("ab01ab01ab01ab01")), ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")), Org: platform.ID([]byte("ab01ab01ab01ab05")),
} }
sf := time.Now().UTC()
sa := sf.Add(time.Second)
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
TaskID: task.ID, TaskID: task.ID,
Status: "scheduled", Status: "started",
ScheduledFor: time.Now().UTC().Format(time.RFC3339), ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
} }
if err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunScheduled); err != nil { if err := writer.UpdateRunState(context.Background(), rlb, sa, backend.RunStarted); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -306,18 +319,24 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
runs := make([]platform.Run, 20) runs := make([]platform.Run, 20)
for i := 0; i < len(runs); i++ { for i := 0; i < len(runs); i++ {
sf := time.Unix(int64(i), 0)
runs[i] = platform.Run{ runs[i] = platform.Run{
ID: platform.ID([]byte(fmt.Sprintf("run%d", i))), ID: platform.ID([]byte(fmt.Sprintf("run%d", i))),
Status: "started", Status: "started",
ScheduledFor: time.Unix(int64(i), 0).Format(time.RFC3339), ScheduledFor: sf.UTC().Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: sf.Unix(),
} }
err := writer.UpdateRunState(context.Background(), task, runs[i].ID, time.Now(), backend.RunScheduled) err := writer.UpdateRunState(context.Background(), rlb, time.Now(), backend.RunStarted)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
writer.AddRunLog(context.Background(), task, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) writer.AddRunLog(context.Background(), rlb, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i))
} }
logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID}) logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID})

View File

@ -206,7 +206,12 @@ func testTaskRuns(t *testing.T, sys *System) {
t.Fatal(err) t.Fatal(err)
} }
startedAt := time.Now() startedAt := time.Now()
if err := sys.LW.UpdateRunState(sys.Ctx, st, runID, startedAt, backend.RunStarted); err != nil { rlb := backend.RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: rc.Created.Now,
}
if err := sys.LW.UpdateRunState(sys.Ctx, rlb, startedAt, backend.RunStarted); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -224,24 +229,23 @@ func testTaskRuns(t *testing.T, sys *System) {
r := runs[0] r := runs[0]
if !bytes.Equal(r.ID, runID) { if !bytes.Equal(r.ID, runID) {
t.Fatalf("expected to find run with ID %s, got %s", runID.String(), r.ID.String()) t.Errorf("expected to find run with ID %s, got %s", runID.String(), r.ID.String())
} }
if !bytes.Equal(r.TaskID, task.ID) { if !bytes.Equal(r.TaskID, task.ID) {
t.Fatalf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String()) t.Errorf("expected run to have task ID %s, got %s", task.ID.String(), r.TaskID.String())
} }
if want := startedAt.UTC().Format(time.RFC3339); r.StartedAt != want { if want := startedAt.UTC().Format(time.RFC3339); r.StartedAt != want {
t.Fatalf("expected run to be started at %q, got %q", want, r.StartedAt) t.Errorf("expected run to be started at %q, got %q", want, r.StartedAt)
} }
if want := time.Unix(rc.Created.Now, 0).UTC().Format(time.RFC3339); r.ScheduledFor != want { if want := time.Unix(rc.Created.Now, 0).UTC().Format(time.RFC3339); r.ScheduledFor != want {
// Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753. t.Errorf("expected run to be scheduled for %q, got %q", want, r.ScheduledFor)
t.Logf("TODO(#753): expected run to be scheduled for %q, got %q", want, r.ScheduledFor)
} }
if want := time.Unix(requestedAtUnix, 0).UTC().Format(time.RFC3339); r.RequestedAt != want { if want := time.Unix(requestedAtUnix, 0).UTC().Format(time.RFC3339); r.RequestedAt != want {
// Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753. // Not yet expected to match. Change to t.Fatalf as part of addressing https://github.com/influxdata/platform/issues/753.
t.Logf("TODO(#753): expected run to be requested at %q, got %q", want, r.RequestedAt) t.Logf("TODO(#753): expected run to be requested at %q, got %q", want, r.RequestedAt)
} }
if r.FinishedAt != "" { if r.FinishedAt != "" {
t.Fatalf("expected run not be finished, got %q", r.FinishedAt) t.Errorf("expected run not be finished, got %q", r.FinishedAt)
} }
} }