From be28de8fbc9d95f169bab3ae88729f56035aedec Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Wed, 9 Oct 2019 13:51:03 -0700 Subject: [PATCH] feat(tasks): deactivate task on unrecoverable error (#15369) --- task/backend/check_task_error.go | 26 ++++++++++++++++ task/backend/executor/executor_metrics.go | 9 ++++-- task/backend/executor/task_executor.go | 30 ++++++++++++++----- task/backend/executor/task_executor_test.go | 33 +++++++++++++++++++++ task_errors.go | 22 +++++++------- 5 files changed, 100 insertions(+), 20 deletions(-) create mode 100644 task/backend/check_task_error.go diff --git a/task/backend/check_task_error.go b/task/backend/check_task_error.go new file mode 100644 index 0000000000..0102dc10b4 --- /dev/null +++ b/task/backend/check_task_error.go @@ -0,0 +1,26 @@ +package backend + +import ( + "strings" +) + +// IsUnrecoverable takes in an error and determines if it is permanent (requiring user intervention to fix) +func IsUnrecoverable(err error) bool { + if err == nil { + return false + } + + errString := err.Error() + + // missing bucket requires user intervention to resolve + if strings.Contains(errString, "could not find bucket") { + return true + } + + // unparseable Flux requires user intervention to resolve + if strings.Contains(errString, "could not parse Flux script") { + return true + } + + return false +} diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 85257bf10f..e3591033a8 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -131,8 +131,13 @@ func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status backend.RunStat } // LogError increments the count of errors. -func (em *ExecutorMetrics) LogError(taskType string, err *influxdb.Error) { - em.errorsCounter.WithLabelValues(taskType, err.Code).Inc() +func (em *ExecutorMetrics) LogError(taskType string, err error) { + switch e := err.(type) { + case *influxdb.Error: + em.errorsCounter.WithLabelValues(taskType, e.Code).Inc() + default: + em.errorsCounter.WithLabelValues(taskType, "unknown").Inc() + } } // Describe returns all descriptions associated with the run collector. diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index d1d75934c7..8aff87f98c 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -313,7 +313,7 @@ func (w *worker) start(p *promise) { w.te.metrics.StartRun(p.task, time.Since(p.createdAt)) } -func (w *worker) finish(p *promise, rs backend.RunStatus, err *influxdb.Error) { +func (w *worker) finish(p *promise, rs backend.RunStatus, err error) { // trace span, ctx := tracing.StartSpanFromContext(p.ctx) defer span.Finish() @@ -329,9 +329,19 @@ func (w *worker) finish(p *promise, rs backend.RunStatus, err *influxdb.Error) { w.te.metrics.FinishRun(p.task, rs, rd) // log error - if err.Err != nil { + if err != nil { + w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), err.Error()) w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String())) w.te.metrics.LogError(p.task.Type, err) + + if backend.IsUnrecoverable(err) { + // if we get an error that requires user intervention to fix, deactivate the task and alert the user + inactive := string(backend.TaskInactive) + w.te.ts.UpdateTask(p.ctx, p.task.ID, influxdb.TaskUpdate{Status: &inactive}) + // and add to run logs + w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), fmt.Sprintf("Task deactivated after encountering unrecoverable error: %v", err.Error())) + } + p.err = err } else { w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String())) @@ -385,10 +395,6 @@ func (w *worker) executeQuery(p *promise) { it.Release() - if runErr == nil { - runErr = it.Err() - } - // log the statistics on the run stats := it.Statistics() @@ -397,7 +403,17 @@ func (w *worker) executeQuery(p *promise) { w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b)) } - w.finish(p, backend.RunSuccess, influxdb.ErrResultIteratorError(runErr)) + if runErr != nil { + w.finish(p, backend.RunFail, influxdb.ErrRunExecutionError(runErr)) + return + } + + if it.Err() != nil { + w.finish(p, backend.RunFail, influxdb.ErrResultIteratorError(it.Err())) + return + } + + w.finish(p, backend.RunSuccess, nil) } // RunsActive returns the current number of workers, which is equivalent to diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index 74db589506..1b833a30cc 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -55,6 +55,7 @@ func TestTaskExecutor(t *testing.T) { t.Run("LimitFunc", testLimitFunc) t.Run("Metrics", testMetrics) t.Run("IteratorFailure", testIteratorFailure) + t.Run("ErrorHandling", testErrorHandling) } func testQuerySuccess(t *testing.T) { @@ -415,3 +416,35 @@ func testIteratorFailure(t *testing.T) { t.Fatal("got no error when I should have") } } + +func testErrorHandling(t *testing.T) { + t.Parallel() + tes := taskExecutorSystem(t) + + script := fmt.Sprintf(fmtTestScript, t.Name()) + ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) + task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script, Status: "active"}) + if err != nil { + t.Fatal(err) + } + + // encountering a bucket not found error should deactivate the task + forcedErr := errors.New("could not find bucket") + tes.svc.FailNextQuery(forcedErr) + + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + if err != nil { + t.Fatal(err) + } + + <-promise.Done() + + inactive, err := tes.i.FindTaskByID(context.Background(), task.ID) + if err != nil { + t.Fatal(err) + } + + if inactive.Status != "inactive" { + t.Fatal("expected task to be deactivated after permanent error") + } +} diff --git a/task_errors.go b/task_errors.go index 03dbc46bb0..9da09359c8 100644 --- a/task_errors.go +++ b/task_errors.go @@ -96,7 +96,7 @@ func ErrFluxParseError(err error) *Error { return &Error{ Code: EInvalid, Msg: fmt.Sprintf("could not parse Flux script; Err: %v", err), - Op: "kv/taskExecutor", + Op: "taskExecutor", Err: err, } } @@ -106,7 +106,7 @@ func ErrQueryError(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("unexpected error from queryd; Err: %v", err), - Op: "kv/taskExecutor", + Op: "taskExecutor", Err: err, } } @@ -116,7 +116,7 @@ func ErrResultIteratorError(err error) *Error { return &Error{ Code: EInvalid, Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err), - Op: "kv/taskExecutor", + Op: "taskExecutor", Err: err, } } @@ -125,7 +125,7 @@ func ErrInternalTaskServiceError(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err), - Op: "kv/task", + Op: "task", Err: err, } } @@ -135,7 +135,7 @@ func ErrUnexpectedTaskBucketErr(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err), - Op: "kv/taskBucket", + Op: "taskBucket", Err: err, } } @@ -143,9 +143,9 @@ func ErrUnexpectedTaskBucketErr(err error) *Error { // ErrTaskTimeParse an error for time parsing errors func ErrTaskTimeParse(err error) *Error { return &Error{ - Code: EInvalid, + Code: EInternal, Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err), - Op: "kv/taskCron", + Op: "taskCron", Err: err, } } @@ -154,7 +154,7 @@ func ErrTaskOptionParse(err error) *Error { return &Error{ Code: EInvalid, Msg: fmt.Sprintf("invalid options; Err: %v", err), - Op: "kv/taskOptions", + Op: "taskOptions", Err: err, } } @@ -171,7 +171,7 @@ func ErrCouldNotLogError(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("unable to log error; Err: %v", err), - Op: "kv/taskScheduler", + Op: "taskScheduler", Err: err, } } @@ -180,7 +180,7 @@ func ErrJsonMarshalError(err error) *Error { return &Error{ Code: EInvalid, Msg: fmt.Sprintf("unable to marshal JSON; Err: %v", err), - Op: "kv/taskScheduler", + Op: "taskScheduler", Err: err, } } @@ -189,7 +189,7 @@ func ErrRunExecutionError(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("could not execute task run; Err: %v", err), - Op: "kv/taskExecutor", + Op: "taskExecutor", Err: err, } }