fix(tasks): use go errors for scheduler metrics (#15374)
parent
072270d7d8
commit
364e80bc94
|
|
@ -712,16 +712,12 @@ func (r *runner) clearRunning(id platform.ID) {
|
|||
}
|
||||
|
||||
// fail sets r's state to failed, and marks this runner as idle.
|
||||
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason influxdb.Error) {
|
||||
influxErr := &reason
|
||||
err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error())
|
||||
|
||||
if err != nil {
|
||||
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) {
|
||||
if err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()); err != nil {
|
||||
runLogger.Info("Failed to update run log", zap.Error(err))
|
||||
influxErr = influxdb.ErrCouldNotLogError(err)
|
||||
}
|
||||
|
||||
r.updateRunState(qr, RunFail, runLogger, influxErr)
|
||||
r.updateRunState(qr, RunFail, runLogger, reason)
|
||||
atomic.StoreUint32(r.state, runnerIdle)
|
||||
}
|
||||
|
||||
|
|
@ -749,8 +745,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
errMsg = "Beginning run execution failed, " + errMsg
|
||||
// TODO(mr): retry?
|
||||
|
||||
influxErr := *influxdb.ErrRunExecutionError(err)
|
||||
r.fail(qr, runLogger, "Run failed to begin execution", influxErr)
|
||||
r.fail(qr, runLogger, "Run failed to begin execution", influxdb.ErrRunExecutionError(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -776,7 +771,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
close(ready)
|
||||
if err != nil {
|
||||
if err == platform.ErrRunCanceled {
|
||||
r.updateRunState(qr, RunCanceled, runLogger, err.(*influxdb.Error))
|
||||
r.updateRunState(qr, RunCanceled, runLogger, err)
|
||||
errMsg = "Waiting for execution result failed, " + errMsg
|
||||
// Move on to the next execution, for a canceled run.
|
||||
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
||||
|
|
@ -786,7 +781,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
runLogger.Info("Failed to wait for execution result", zap.Error(err))
|
||||
|
||||
// TODO(mr): retry?
|
||||
r.fail(qr, runLogger, "Waiting for execution result", *influxdb.ErrRunExecutionError(err))
|
||||
r.fail(qr, runLogger, "Waiting for execution result", influxdb.ErrRunExecutionError(err))
|
||||
return
|
||||
}
|
||||
if err := rr.Err(); err != nil {
|
||||
|
|
@ -794,16 +789,15 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
errMsg = "Run failed to execute, " + errMsg
|
||||
|
||||
// TODO(mr): retry?
|
||||
r.fail(qr, runLogger, "Run failed to execute", *influxdb.ErrRunExecutionError(err))
|
||||
r.fail(qr, runLogger, "Run failed to execute", influxdb.ErrRunExecutionError(err))
|
||||
return
|
||||
}
|
||||
|
||||
stats := rr.Statistics()
|
||||
|
||||
var influxErr *influxdb.Error
|
||||
b, err := json.Marshal(stats)
|
||||
if err != nil {
|
||||
influxErr = influxdb.ErrJsonMarshalError(err)
|
||||
err = influxdb.ErrJsonMarshalError(err)
|
||||
} else {
|
||||
// authctx can be updated mid process
|
||||
r.ts.nextDueMu.RLock()
|
||||
|
|
@ -811,14 +805,14 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
|||
r.ts.nextDueMu.RUnlock()
|
||||
r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b))
|
||||
}
|
||||
r.updateRunState(qr, RunSuccess, runLogger, influxErr)
|
||||
r.updateRunState(qr, RunSuccess, runLogger, err)
|
||||
runLogger.Debug("Execution succeeded")
|
||||
|
||||
// Check again if there is a new run available, without returning to idle state.
|
||||
r.startFromWorking(atomic.LoadInt64(r.ts.now))
|
||||
}
|
||||
|
||||
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err *influxdb.Error) {
|
||||
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger, err error) {
|
||||
switch s {
|
||||
case RunStarted:
|
||||
dueAt := time.Unix(qr.DueAt, 0)
|
||||
|
|
|
|||
|
|
@ -132,8 +132,13 @@ func (sm *schedulerMetrics) FinishRun(tid string, succeeded bool, executionDelta
|
|||
}
|
||||
|
||||
// LogError increments the count of errors.
|
||||
func (sm *schedulerMetrics) LogError(err *influxdb.Error) {
|
||||
sm.errorsCounter.WithLabelValues(err.Code).Inc()
|
||||
func (sm *schedulerMetrics) LogError(err error) {
|
||||
switch e := err.(type) {
|
||||
case *influxdb.Error:
|
||||
sm.errorsCounter.WithLabelValues(e.Code).Inc()
|
||||
default:
|
||||
sm.errorsCounter.WithLabelValues("unknown").Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// ClaimTask adjusts the metrics to indicate the result of an attempted claim.
|
||||
|
|
|
|||
|
|
@ -167,15 +167,6 @@ func ErrRunNotDueYet(dueAt int64) *Error {
|
|||
}
|
||||
}
|
||||
|
||||
func ErrCouldNotLogError(err error) *Error {
|
||||
return &Error{
|
||||
Code: EInternal,
|
||||
Msg: fmt.Sprintf("unable to log error; Err: %v", err),
|
||||
Op: "taskScheduler",
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func ErrJsonMarshalError(err error) *Error {
|
||||
return &Error{
|
||||
Code: EInvalid,
|
||||
|
|
|
|||
Loading…
Reference in New Issue