From aef199bcc1a503c042a067e3fec578a5faf484aa Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Mon, 16 Sep 2019 13:55:39 -0700 Subject: [PATCH] fix(tasks): use influxdb errors in scheduler (#15145) --- task/backend/executor/executor_metrics.go | 2 +- task/backend/scheduler.go | 35 ++++++++++++++++------- task/backend/scheduler_metrics.go | 16 +++++++++++ task/backend/scheduler_test.go | 5 ++++ task_errors.go | 34 +++++++++++++++++++++- 5 files changed, 79 insertions(+), 13 deletions(-) diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 1820e5dd8d..fdf09bb042 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -59,7 +59,7 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics { Namespace: namespace, Subsystem: subsystem, Name: "errors_counter", - Help: "The number of errors thrown by the executor with the type of error (ex. Flux compile, query, etc).", + Help: "The number of errors thrown by the executor with the type of error (ex. Invalid, Internal, etc.)", }, []string{"errorType"}), manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 3d77783001..568b90f223 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/influxdb" platform "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" @@ -711,17 +712,21 @@ func (r *runner) clearRunning(id platform.ID) { } // fail sets r's state to failed, and marks this runner as idle. -func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) { - if err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()); err != nil { +func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason influxdb.Error) { + influxErr := &reason + err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()) + + if err != nil { runLogger.Info("Failed to update run log", zap.Error(err)) + influxErr = influxdb.ErrCouldNotLogError(err) } - r.updateRunState(qr, RunFail, runLogger) + r.updateRunState(qr, RunFail, runLogger, influxErr) atomic.StoreUint32(r.state, runnerIdle) } func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) { - r.updateRunState(qr, RunStarted, runLogger) + r.updateRunState(qr, RunStarted, runLogger, nil) qr.startedAt = time.Now() defer r.wg.Done() errMsg := "Failed to finish run" @@ -743,7 +748,9 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za runLogger.Info("Failed to begin run execution", zap.Error(err)) errMsg = "Beginning run execution failed, " + errMsg // TODO(mr): retry? - r.fail(qr, runLogger, "Run failed to begin execution", err) + + influxErr := *influxdb.ErrRunExecutionError(err) + r.fail(qr, runLogger, "Run failed to begin execution", influxErr) return } @@ -769,7 +776,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za close(ready) if err != nil { if err == platform.ErrRunCanceled { - r.updateRunState(qr, RunCanceled, runLogger) + r.updateRunState(qr, RunCanceled, runLogger, err.(*influxdb.Error)) errMsg = "Waiting for execution result failed, " + errMsg // Move on to the next execution, for a canceled run. r.startFromWorking(atomic.LoadInt64(r.ts.now)) @@ -779,7 +786,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za runLogger.Info("Failed to wait for execution result", zap.Error(err)) // TODO(mr): retry? - r.fail(qr, runLogger, "Waiting for execution result", err) + r.fail(qr, runLogger, "Waiting for execution result", *influxdb.ErrRunExecutionError(err)) return } if err := rr.Err(); err != nil { @@ -787,28 +794,31 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za errMsg = "Run failed to execute, " + errMsg // TODO(mr): retry? - r.fail(qr, runLogger, "Run failed to execute", err) + r.fail(qr, runLogger, "Run failed to execute", *influxdb.ErrRunExecutionError(err)) return } stats := rr.Statistics() + var influxErr *influxdb.Error b, err := json.Marshal(stats) - if err == nil { + if err != nil { + influxErr = influxdb.ErrJsonMarshalError(err) + } else { // authctx can be updated mid process r.ts.nextDueMu.RLock() authCtx := r.ts.authCtx r.ts.nextDueMu.RUnlock() r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b)) } - r.updateRunState(qr, RunSuccess, runLogger) + r.updateRunState(qr, RunSuccess, runLogger, influxErr) runLogger.Debug("Execution succeeded") // Check again if there is a new run available, without returning to idle state. r.startFromWorking(atomic.LoadInt64(r.ts.now)) } -func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) { +func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err *influxdb.Error) { switch s { case RunStarted: dueAt := time.Unix(qr.DueAt, 0) @@ -827,6 +837,9 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger // There is not really a notion of being queued in this runner architecture. runLogger.Warn("Unhandled run state", zap.Stringer("state", s)) } + if err != nil { + r.ts.metrics.LogError(err) + } if err := r.taskControlService.UpdateRunState(r.ctx, r.task.ID, qr.RunID, time.Now(), s); err != nil { runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err)) diff --git a/task/backend/scheduler_metrics.go b/task/backend/scheduler_metrics.go index 9e82cd63b8..671554088a 100644 --- a/task/backend/scheduler_metrics.go +++ b/task/backend/scheduler_metrics.go @@ -3,6 +3,7 @@ package backend import ( "time" + "github.com/influxdata/influxdb" "github.com/prometheus/client_golang/prometheus" ) @@ -16,6 +17,8 @@ type schedulerMetrics struct { runsComplete *prometheus.CounterVec runsActive *prometheus.GaugeVec + errorsCounter *prometheus.CounterVec + claimsComplete *prometheus.CounterVec claimsActive prometheus.Gauge @@ -54,6 +57,13 @@ func newSchedulerMetrics() *schedulerMetrics { Help: "Total number of runs that have started but not yet completed, split out by task ID.", }, []string{"task_id"}), + errorsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "errors_counter", + Help: "The number of errors thrown by scheduler with the type of error (ex. Invalid, Internal, etc.", + }, []string{"error_type"}), + claimsComplete: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -94,6 +104,7 @@ func (sm *schedulerMetrics) PrometheusCollectors() []prometheus.Collector { sm.claimsActive, sm.queueDelta, sm.executionDelta, + sm.errorsCounter, } } @@ -120,6 +131,11 @@ func (sm *schedulerMetrics) FinishRun(tid string, succeeded bool, executionDelta sm.executionDelta.WithLabelValues(tid).Observe(executionDelta.Seconds()) } +// LogError increments the count of errors. +func (sm *schedulerMetrics) LogError(err *influxdb.Error) { + sm.errorsCounter.WithLabelValues(err.Code).Inc() +} + // ClaimTask adjusts the metrics to indicate the result of an attempted claim. func (sm *schedulerMetrics) ClaimTask(succeeded bool) { status := statusString(succeeded) diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index a9a6bda01f..01584c7e28 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -921,6 +921,11 @@ func TestScheduler_Metrics(t *testing.T) { t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got) } + m = promtest.MustFindMetric(t, mfs, "task_scheduler_errors_counter", map[string]string{"error_type": "internal error"}) + if got := *m.Counter.Value; got != 1 { + t.Fatalf("expected error type in metric to be internal error, got %v", got) + } + // Runs label removed after task released. if err := s.ReleaseTask(task.ID); err != nil { t.Fatal(err) diff --git a/task_errors.go b/task_errors.go index 80fbf0a497..fa9e76809d 100644 --- a/task_errors.go +++ b/task_errors.go @@ -53,6 +53,11 @@ var ( Msg: "run not found", } + ErrRunKeyNotFound = &Error{ + Code: ENotFound, + Msg: "run key not found", + } + ErrPageSizeTooSmall = &Error{ Msg: "cannot have negative page limit", Code: EInvalid, @@ -109,7 +114,7 @@ func ErrQueryError(err error) *Error { // ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor func ErrResultIteratorError(err error) *Error { return &Error{ - Code: EInternal, + Code: EInvalid, Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err), Op: "kv/taskExecutor", Err: err, @@ -161,3 +166,30 @@ func ErrRunNotDueYet(dueAt int64) *Error { Msg: fmt.Sprintf("run not due until: %v", time.Unix(dueAt, 0).UTC().Format(time.RFC3339)), } } + +func ErrCouldNotLogError(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unable to log error; Err: %v", err), + Op: "kv/taskScheduler", + Err: err, + } +} + +func ErrJsonMarshalError(err error) *Error { + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf("unable to marshal JSON; Err: %v", err), + Op: "kv/taskScheduler", + Err: err, + } +} + +func ErrRunExecutionError(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("could not execute task run; Err: %v", err), + Op: "kv/taskScheduler", + Err: err, + } +}