feat(task): include run errors in task log

This should considerably simplify debugging when things go wrong with
the tasks, as this error can be displayed from the UI or CLI. Prior to
this change, you would have to view the console output from influxd.

Fixes #12548.
pull/12645/head
Mark Rushakoff 2019-03-12 16:19:05 -07:00 committed by Mark Rushakoff
parent 50f53d16f7
commit 50b5b25f71
2 changed files with 59 additions and 8 deletions

View File

@ -11,7 +11,7 @@ import (
"time"
"github.com/influxdata/flux"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -263,7 +263,6 @@ func (s *TickScheduler) Start(ctx context.Context) {
}
func (s *TickScheduler) Stop() {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
@ -640,6 +639,22 @@ func (r *runner) clearRunning(id platform.ID) {
r.ts.runningMu.Unlock()
}
// fail sets r's state to failed, and marks this runner as idle.
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) {
rlb := RunLogBase{
Task: r.task,
RunID: qr.RunID,
RunScheduledFor: qr.Now,
RequestedAt: qr.RequestedAt,
}
if err := r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), stage+": "+reason.Error()); err != nil {
runLogger.Info("Failed to update run log", zap.Error(err))
}
r.updateRunState(qr, RunFail, runLogger)
atomic.StoreUint32(r.state, runnerIdle)
}
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
defer r.wg.Done()
@ -655,8 +670,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
}
// TODO(mr): retry?
atomic.StoreUint32(r.state, runnerIdle)
r.updateRunState(qr, RunFail, runLogger)
r.fail(qr, runLogger, "Run failed to begin execution", err)
return
}
@ -697,8 +711,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
}
// TODO(mr): retry?
r.updateRunState(qr, RunFail, runLogger)
atomic.StoreUint32(r.state, runnerIdle)
r.fail(qr, runLogger, "Waiting for execution result", err)
return
}
if err := rr.Err(); err != nil {
@ -708,8 +721,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
runLogger.Error("Run failed to execute, and desired state update failed", zap.Error(err))
}
// TODO(mr): retry?
r.updateRunState(qr, RunFail, runLogger)
atomic.StoreUint32(r.state, runnerIdle)
r.fail(qr, runLogger, "Run failed to execute", err)
return
}

View File

@ -396,6 +396,37 @@ func TestScheduler_Queue(t *testing.T) {
}
}
func pollForRunLog(t *testing.T, r backend.LogReader, taskID, runID, orgID platform.ID, exp string) {
t.Helper()
var logs []platform.Log
var err error
const maxAttempts = 50
for i := 0; i < maxAttempts; i++ {
if i != 0 {
time.Sleep(10 * time.Millisecond)
}
logs, err = r.ListLogs(context.Background(), orgID, platform.LogFilter{Task: taskID, Run: &runID})
if err != nil {
t.Fatal(err)
}
for _, log := range logs {
if log.Message == exp {
return
}
}
}
t.Logf("Didn't find message %q in logs:", exp)
for _, log := range logs {
t.Logf("\t%s", log.Message)
}
t.FailNow()
}
// pollForRunStatus tries a few times to find runs matching supplied conditions, before failing.
func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.ID, expCount, expIndex int, expStatus string) {
t.Helper()
@ -570,6 +601,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Waiting for execution result: forced failure")
// Should continue even if max concurrency == 1.
// This run will start and then fail.
@ -583,6 +615,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Run failed to execute: started but failed to finish properly")
// Fail to execute next run.
if n := d.TotalRunsCreatedForTask(task.ID); n != 2 {
@ -603,6 +636,12 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
t.Fatalf("expected 3 runs created, got %d", n)
}
}
// We don't have a good hook to get the run ID right now, so list the runs and assume the final one is ours.
runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID})
if err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, runs[len(runs)-1].ID, task.Org, "Run failed to begin execution: forced failure on Execute")
// One more tick just to ensure that we can keep going after this type of failure too.
s.Tick(9)