From 364e80bc94fdcf9378ad5bafa7a2e255ebad1062 Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Thu, 10 Oct 2019 09:55:30 -0700 Subject: [PATCH] fix(tasks): use go errors for scheduler metrics (#15374) --- task/backend/scheduler.go | 26 ++++++++++---------------- task/backend/scheduler_metrics.go | 9 +++++++-- task_errors.go | 9 --------- 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 568b90f223..66b59c383c 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -712,16 +712,12 @@ func (r *runner) clearRunning(id platform.ID) { } // fail sets r's state to failed, and marks this runner as idle. -func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason influxdb.Error) { - influxErr := &reason - err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()) - - if err != nil { +func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) { + if err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()); err != nil { runLogger.Info("Failed to update run log", zap.Error(err)) - influxErr = influxdb.ErrCouldNotLogError(err) } - r.updateRunState(qr, RunFail, runLogger, influxErr) + r.updateRunState(qr, RunFail, runLogger, reason) atomic.StoreUint32(r.state, runnerIdle) } @@ -749,8 +745,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za errMsg = "Beginning run execution failed, " + errMsg // TODO(mr): retry? - influxErr := *influxdb.ErrRunExecutionError(err) - r.fail(qr, runLogger, "Run failed to begin execution", influxErr) + r.fail(qr, runLogger, "Run failed to begin execution", influxdb.ErrRunExecutionError(err)) return } @@ -776,7 +771,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za close(ready) if err != nil { if err == platform.ErrRunCanceled { - r.updateRunState(qr, RunCanceled, runLogger, err.(*influxdb.Error)) + r.updateRunState(qr, RunCanceled, runLogger, err) errMsg = "Waiting for execution result failed, " + errMsg // Move on to the next execution, for a canceled run. r.startFromWorking(atomic.LoadInt64(r.ts.now)) @@ -786,7 +781,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za runLogger.Info("Failed to wait for execution result", zap.Error(err)) // TODO(mr): retry? - r.fail(qr, runLogger, "Waiting for execution result", *influxdb.ErrRunExecutionError(err)) + r.fail(qr, runLogger, "Waiting for execution result", influxdb.ErrRunExecutionError(err)) return } if err := rr.Err(); err != nil { @@ -794,16 +789,15 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za errMsg = "Run failed to execute, " + errMsg // TODO(mr): retry? - r.fail(qr, runLogger, "Run failed to execute", *influxdb.ErrRunExecutionError(err)) + r.fail(qr, runLogger, "Run failed to execute", influxdb.ErrRunExecutionError(err)) return } stats := rr.Statistics() - var influxErr *influxdb.Error b, err := json.Marshal(stats) if err != nil { - influxErr = influxdb.ErrJsonMarshalError(err) + err = influxdb.ErrJsonMarshalError(err) } else { // authctx can be updated mid process r.ts.nextDueMu.RLock() @@ -811,14 +805,14 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za r.ts.nextDueMu.RUnlock() r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b)) } - r.updateRunState(qr, RunSuccess, runLogger, influxErr) + r.updateRunState(qr, RunSuccess, runLogger, err) runLogger.Debug("Execution succeeded") // Check again if there is a new run available, without returning to idle state. r.startFromWorking(atomic.LoadInt64(r.ts.now)) } -func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err *influxdb.Error) { +func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err error) { switch s { case RunStarted: dueAt := time.Unix(qr.DueAt, 0) diff --git a/task/backend/scheduler_metrics.go b/task/backend/scheduler_metrics.go index 671554088a..836c9fecaa 100644 --- a/task/backend/scheduler_metrics.go +++ b/task/backend/scheduler_metrics.go @@ -132,8 +132,13 @@ func (sm *schedulerMetrics) FinishRun(tid string, succeeded bool, executionDelta } // LogError increments the count of errors. -func (sm *schedulerMetrics) LogError(err *influxdb.Error) { - sm.errorsCounter.WithLabelValues(err.Code).Inc() +func (sm *schedulerMetrics) LogError(err error) { + switch e := err.(type) { + case *influxdb.Error: + sm.errorsCounter.WithLabelValues(e.Code).Inc() + default: + sm.errorsCounter.WithLabelValues("unknown").Inc() + } } // ClaimTask adjusts the metrics to indicate the result of an attempted claim. diff --git a/task_errors.go b/task_errors.go index 9da09359c8..214e4bc40d 100644 --- a/task_errors.go +++ b/task_errors.go @@ -167,15 +167,6 @@ func ErrRunNotDueYet(dueAt int64) *Error { } } -func ErrCouldNotLogError(err error) *Error { - return &Error{ - Code: EInternal, - Msg: fmt.Sprintf("unable to log error; Err: %v", err), - Op: "taskScheduler", - Err: err, - } -} - func ErrJsonMarshalError(err error) *Error { return &Error{ Code: EInvalid,