diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 287ae6edd7..1820e5dd8d 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -13,7 +13,7 @@ type ExecutorMetrics struct { activeRuns prometheus.Collector queueDelta *prometheus.SummaryVec runDuration *prometheus.SummaryVec - errorsCounter prometheus.Counter + errorsCounter *prometheus.CounterVec manualRunsCounter *prometheus.CounterVec resumeRunsCounter *prometheus.CounterVec } @@ -55,12 +55,12 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{"taskID"}), - errorsCounter: prometheus.NewCounter(prometheus.CounterOpts{ + errorsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "errors_counter", - Help: "The number of errors thrown by the executor.", - }), + Help: "The number of errors thrown by the executor with the type of error (ex. Flux compile, query, etc).", + }, []string{"errorType"}), manualRunsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, @@ -131,8 +131,8 @@ func (em *ExecutorMetrics) FinishRun(taskID influxdb.ID, status backend.RunStatu } // LogError increments the count of errors. -func (em *ExecutorMetrics) LogError() { - em.errorsCounter.Inc() +func (em *ExecutorMetrics) LogError(err *influxdb.Error) { + em.errorsCounter.WithLabelValues(err.Code) } // Describe returns all descriptions associated with the run collector. diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index b6d0045fd3..9f5a4a81a8 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -316,7 +316,7 @@ func (w *worker) start(p *Promise) { w.te.metrics.StartRun(p.task.ID, time.Since(p.createdAt)) } -func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) { +func (w *worker) finish(p *Promise, rs backend.RunStatus, err *influxdb.Error) { // trace span, ctx := tracing.StartSpanFromContext(p.ctx) defer span.Finish() @@ -332,9 +332,9 @@ func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) { w.te.metrics.FinishRun(p.task.ID, rs, rd) // log error - if err != nil { + if err.Err != nil { w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String())) - w.te.metrics.LogError() + w.te.metrics.LogError(err) p.err = err } else { w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String())) @@ -350,13 +350,13 @@ func (w *worker) executeQuery(p *Promise) { pkg, err := flux.Parse(p.task.Flux) if err != nil { - w.finish(p, backend.RunFail, err) + w.finish(p, backend.RunFail, influxdb.ErrFluxParseError(err)) return } sf, err := p.run.ScheduledForTime() if err != nil { - w.finish(p, backend.RunFail, err) + w.finish(p, backend.RunFail, influxdb.ErrTaskTimeParse(err)) return } @@ -372,7 +372,7 @@ func (w *worker) executeQuery(p *Promise) { it, err := w.te.qs.Query(ctx, req) if err != nil { // Assume the error should not be part of the runResult. - w.finish(p, backend.RunFail, err) + w.finish(p, backend.RunFail, influxdb.ErrQueryError(err)) return } @@ -400,7 +400,7 @@ func (w *worker) executeQuery(p *Promise) { w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b)) } - w.finish(p, backend.RunSuccess, runErr) + w.finish(p, backend.RunSuccess, influxdb.ErrResultIteratorError(runErr)) } // RunsActive returns the current number of workers, which is equivalent to diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index f49034f4b0..d7e0866e4c 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -267,6 +267,7 @@ func testLimitFunc(t *testing.T) { t.Fatal(err) } forcedErr := errors.New("forced") + forcedQueryErr := influxdb.ErrQueryError(forcedErr) tes.svc.FailNextQuery(forcedErr) count := 0 @@ -285,7 +286,7 @@ func testLimitFunc(t *testing.T) { <-promise.Done() - if got := promise.Error(); got != forcedErr { + if got := promise.Error(); got.Error() != forcedQueryErr.Error() { t.Fatal("failed to get failure from forced error") } diff --git a/task_errors.go b/task_errors.go index f01a4a4ae9..80fbf0a497 100644 --- a/task_errors.go +++ b/task_errors.go @@ -86,6 +86,36 @@ var ( } ) +// ErrFluxParseError is returned when an error is thrown by Flux.Parse in the task executor +func ErrFluxParseError(err error) *Error { + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf("could not parse Flux script; Err: %v", err), + Op: "kv/taskExecutor", + Err: err, + } +} + +// ErrQueryError is returned when an error is thrown by Query service in the task executor +func ErrQueryError(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unexpected error from queryd; Err: %v", err), + Op: "kv/taskExecutor", + Err: err, + } +} + +// ErrResultIteratorError is returned when an error is thrown by exhaustResultIterators in the executor +func ErrResultIteratorError(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err), + Op: "kv/taskExecutor", + Err: err, + } +} + func ErrInternalTaskServiceError(err error) *Error { return &Error{ Code: EInternal,