feat(tasks): deactivate task on unrecoverable error (#15369)

pull/15374/head
Alirie Gray 2019-10-09 13:51:03 -07:00 committed by GitHub
parent faa3f1e22c
commit be28de8fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 20 deletions

View File

@ -0,0 +1,26 @@
package backend
import (
"strings"
)
// IsUnrecoverable takes in an error and determines if it is permanent (requiring user intervention to fix)
func IsUnrecoverable(err error) bool {
if err == nil {
return false
}
errString := err.Error()
// missing bucket requires user intervention to resolve
if strings.Contains(errString, "could not find bucket") {
return true
}
// unparseable Flux requires user intervention to resolve
if strings.Contains(errString, "could not parse Flux script") {
return true
}
return false
}

View File

@ -131,8 +131,13 @@ func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status backend.RunStat
}
// LogError increments the count of errors.
func (em *ExecutorMetrics) LogError(taskType string, err *influxdb.Error) {
em.errorsCounter.WithLabelValues(taskType, err.Code).Inc()
func (em *ExecutorMetrics) LogError(taskType string, err error) {
switch e := err.(type) {
case *influxdb.Error:
em.errorsCounter.WithLabelValues(taskType, e.Code).Inc()
default:
em.errorsCounter.WithLabelValues(taskType, "unknown").Inc()
}
}
// Describe returns all descriptions associated with the run collector.

View File

@ -313,7 +313,7 @@ func (w *worker) start(p *promise) {
w.te.metrics.StartRun(p.task, time.Since(p.createdAt))
}
func (w *worker) finish(p *promise, rs backend.RunStatus, err *influxdb.Error) {
func (w *worker) finish(p *promise, rs backend.RunStatus, err error) {
// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()
@ -329,9 +329,19 @@ func (w *worker) finish(p *promise, rs backend.RunStatus, err *influxdb.Error) {
w.te.metrics.FinishRun(p.task, rs, rd)
// log error
if err.Err != nil {
if err != nil {
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), err.Error())
w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String()))
w.te.metrics.LogError(p.task.Type, err)
if backend.IsUnrecoverable(err) {
// if we get an error that requires user intervention to fix, deactivate the task and alert the user
inactive := string(backend.TaskInactive)
w.te.ts.UpdateTask(p.ctx, p.task.ID, influxdb.TaskUpdate{Status: &inactive})
// and add to run logs
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), fmt.Sprintf("Task deactivated after encountering unrecoverable error: %v", err.Error()))
}
p.err = err
} else {
w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String()))
@ -385,10 +395,6 @@ func (w *worker) executeQuery(p *promise) {
it.Release()
if runErr == nil {
runErr = it.Err()
}
// log the statistics on the run
stats := it.Statistics()
@ -397,7 +403,17 @@ func (w *worker) executeQuery(p *promise) {
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b))
}
w.finish(p, backend.RunSuccess, influxdb.ErrResultIteratorError(runErr))
if runErr != nil {
w.finish(p, backend.RunFail, influxdb.ErrRunExecutionError(runErr))
return
}
if it.Err() != nil {
w.finish(p, backend.RunFail, influxdb.ErrResultIteratorError(it.Err()))
return
}
w.finish(p, backend.RunSuccess, nil)
}
// RunsActive returns the current number of workers, which is equivalent to

View File

@ -55,6 +55,7 @@ func TestTaskExecutor(t *testing.T) {
t.Run("LimitFunc", testLimitFunc)
t.Run("Metrics", testMetrics)
t.Run("IteratorFailure", testIteratorFailure)
t.Run("ErrorHandling", testErrorHandling)
}
func testQuerySuccess(t *testing.T) {
@ -415,3 +416,35 @@ func testIteratorFailure(t *testing.T) {
t.Fatal("got no error when I should have")
}
}
func testErrorHandling(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, influxdb.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script, Status: "active"})
if err != nil {
t.Fatal(err)
}
// encountering a bucket not found error should deactivate the task
forcedErr := errors.New("could not find bucket")
tes.svc.FailNextQuery(forcedErr)
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
<-promise.Done()
inactive, err := tes.i.FindTaskByID(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
if inactive.Status != "inactive" {
t.Fatal("expected task to be deactivated after permanent error")
}
}

View File

@ -96,7 +96,7 @@ func ErrFluxParseError(err error) *Error {
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf("could not parse Flux script; Err: %v", err),
Op: "kv/taskExecutor",
Op: "taskExecutor",
Err: err,
}
}
@ -106,7 +106,7 @@ func ErrQueryError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("unexpected error from queryd; Err: %v", err),
Op: "kv/taskExecutor",
Op: "taskExecutor",
Err: err,
}
}
@ -116,7 +116,7 @@ func ErrResultIteratorError(err error) *Error {
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf("Error exhausting result iterator; Err: %v", err),
Op: "kv/taskExecutor",
Op: "taskExecutor",
Err: err,
}
}
@ -125,7 +125,7 @@ func ErrInternalTaskServiceError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err),
Op: "kv/task",
Op: "task",
Err: err,
}
}
@ -135,7 +135,7 @@ func ErrUnexpectedTaskBucketErr(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err),
Op: "kv/taskBucket",
Op: "taskBucket",
Err: err,
}
}
@ -143,9 +143,9 @@ func ErrUnexpectedTaskBucketErr(err error) *Error {
// ErrTaskTimeParse an error for time parsing errors
func ErrTaskTimeParse(err error) *Error {
return &Error{
Code: EInvalid,
Code: EInternal,
Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err),
Op: "kv/taskCron",
Op: "taskCron",
Err: err,
}
}
@ -154,7 +154,7 @@ func ErrTaskOptionParse(err error) *Error {
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf("invalid options; Err: %v", err),
Op: "kv/taskOptions",
Op: "taskOptions",
Err: err,
}
}
@ -171,7 +171,7 @@ func ErrCouldNotLogError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("unable to log error; Err: %v", err),
Op: "kv/taskScheduler",
Op: "taskScheduler",
Err: err,
}
}
@ -180,7 +180,7 @@ func ErrJsonMarshalError(err error) *Error {
return &Error{
Code: EInvalid,
Msg: fmt.Sprintf("unable to marshal JSON; Err: %v", err),
Op: "kv/taskScheduler",
Op: "taskScheduler",
Err: err,
}
}
@ -189,7 +189,7 @@ func ErrRunExecutionError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("could not execute task run; Err: %v", err),
Op: "kv/taskExecutor",
Op: "taskExecutor",
Err: err,
}
}