From 50b5b25f71646f511328a018ed3cd8caf690b57e Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Tue, 12 Mar 2019 16:19:05 -0700 Subject: [PATCH] feat(task): include run errors in task log This should considerably simplify debugging when things go wrong with the tasks, as this error can be displayed from the UI or CLI. Prior to this change, you would have to view the console output from influxd. Fixes #12548. --- task/backend/scheduler.go | 28 +++++++++++++++++------- task/backend/scheduler_test.go | 39 ++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 3db8d5e0e2..4f24924ed0 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -11,7 +11,7 @@ import ( "time" "github.com/influxdata/flux" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -263,7 +263,6 @@ func (s *TickScheduler) Start(ctx context.Context) { } func (s *TickScheduler) Stop() { - s.schedulerMu.Lock() defer s.schedulerMu.Unlock() @@ -640,6 +639,22 @@ func (r *runner) clearRunning(id platform.ID) { r.ts.runningMu.Unlock() } +// fail sets r's state to failed, and marks this runner as idle. +func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) { + rlb := RunLogBase{ + Task: r.task, + RunID: qr.RunID, + RunScheduledFor: qr.Now, + RequestedAt: qr.RequestedAt, + } + if err := r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), stage+": "+reason.Error()); err != nil { + runLogger.Info("Failed to update run log", zap.Error(err)) + } + + r.updateRunState(qr, RunFail, runLogger) + atomic.StoreUint32(r.state, runnerIdle) +} + func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) { defer r.wg.Done() @@ -655,8 +670,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za } // TODO(mr): retry? - atomic.StoreUint32(r.state, runnerIdle) - r.updateRunState(qr, RunFail, runLogger) + r.fail(qr, runLogger, "Run failed to begin execution", err) return } @@ -697,8 +711,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za } // TODO(mr): retry? - r.updateRunState(qr, RunFail, runLogger) - atomic.StoreUint32(r.state, runnerIdle) + r.fail(qr, runLogger, "Waiting for execution result", err) return } if err := rr.Err(); err != nil { @@ -708,8 +721,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za runLogger.Error("Run failed to execute, and desired state update failed", zap.Error(err)) } // TODO(mr): retry? - r.updateRunState(qr, RunFail, runLogger) - atomic.StoreUint32(r.state, runnerIdle) + r.fail(qr, runLogger, "Run failed to execute", err) return } diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 0371328aca..8570231385 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -396,6 +396,37 @@ func TestScheduler_Queue(t *testing.T) { } } +func pollForRunLog(t *testing.T, r backend.LogReader, taskID, runID, orgID platform.ID, exp string) { + t.Helper() + + var logs []platform.Log + var err error + + const maxAttempts = 50 + for i := 0; i < maxAttempts; i++ { + if i != 0 { + time.Sleep(10 * time.Millisecond) + } + + logs, err = r.ListLogs(context.Background(), orgID, platform.LogFilter{Task: taskID, Run: &runID}) + if err != nil { + t.Fatal(err) + } + + for _, log := range logs { + if log.Message == exp { + return + } + } + } + + t.Logf("Didn't find message %q in logs:", exp) + for _, log := range logs { + t.Logf("\t%s", log.Message) + } + t.FailNow() +} + // pollForRunStatus tries a few times to find runs matching supplied conditions, before failing. func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.ID, expCount, expIndex int, expStatus string) { t.Helper() @@ -570,6 +601,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } + pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Waiting for execution result: forced failure") // Should continue even if max concurrency == 1. // This run will start and then fail. @@ -583,6 +615,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } + pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Run failed to execute: started but failed to finish properly") // Fail to execute next run. if n := d.TotalRunsCreatedForTask(task.ID); n != 2 { @@ -603,6 +636,12 @@ func TestScheduler_RunFailureCleanup(t *testing.T) { t.Fatalf("expected 3 runs created, got %d", n) } } + // We don't have a good hook to get the run ID right now, so list the runs and assume the final one is ours. + runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID}) + if err != nil { + t.Fatal(err) + } + pollForRunLog(t, rl, task.ID, runs[len(runs)-1].ID, task.Org, "Run failed to begin execution: forced failure on Execute") // One more tick just to ensure that we can keep going after this type of failure too. s.Tick(9)