diff --git a/http/task_service.go b/http/task_service.go index df40bfa474..92c3b679b8 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -644,7 +644,7 @@ func (h *TaskHandler) handleUpdateTask(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to update task", } - if err.Err == backend.ErrTaskNotFound { + if err.Err == &influxdb.ErrTaskNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -721,7 +721,7 @@ func (h *TaskHandler) handleDeleteTask(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to delete task", } - if err.Err == backend.ErrTaskNotFound { + if err.Err == &influxdb.ErrTaskNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -798,7 +798,7 @@ func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to find task logs", } - if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrNoRunsFound { + if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -890,7 +890,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to find runs", } - if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrNoRunsFound { + if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -941,7 +941,7 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest } if i < 1 || i > influxdb.TaskMaxPageSize { - return nil, backend.ErrOutOfBoundsLimit + return nil, &influxdb.ErrOutOfBoundsLimit } req.filter.Limit = i } @@ -994,7 +994,7 @@ func (h *TaskHandler) handleForceRun(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to force run", } - if err.Err == backend.ErrTaskNotFound { + if err.Err == &influxdb.ErrTaskNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -1093,7 +1093,7 @@ func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to find run", } - if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound { + if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -1199,7 +1199,7 @@ func (h *TaskHandler) handleCancelRun(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to cancel run", } - if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound { + if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -1250,7 +1250,7 @@ func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) { Err: err, Msg: "failed to retry run", } - if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound { + if err.Err == &influxdb.ErrTaskNotFound || err.Err == &platform.ErrRunNotFound { err.Code = platform.ENotFound } EncodeError(ctx, err, w) @@ -1387,7 +1387,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor // ErrTaskNotFound is expected as part of the FindTaskByID contract, // so return that actual error instead of a different error that looks like it. // TODO cleanup backend task service error implementation - return nil, backend.ErrTaskNotFound + return nil, &influxdb.ErrTaskNotFound } return nil, err } @@ -1645,7 +1645,7 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([ } if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize { - return nil, 0, backend.ErrOutOfBoundsLimit + return nil, 0, &influxdb.ErrOutOfBoundsLimit } val.Set("limit", strconv.Itoa(filter.Limit)) @@ -1715,7 +1715,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID) // ErrRunNotFound is expected as part of the FindRunByID contract, // so return that actual error instead of a different error that looks like it. // TODO cleanup backend error implementation - return nil, backend.ErrRunNotFound + return nil, &platform.ErrRunNotFound } return nil, err @@ -1759,7 +1759,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (* // ErrRunNotFound is expected as part of the RetryRun contract, // so return that actual error instead of a different error that looks like it. // TODO cleanup backend task error implementation - return nil, backend.ErrRunNotFound + return nil, &platform.ErrRunNotFound } // RequestStillQueuedError is also part of the contract. if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil { @@ -1806,7 +1806,7 @@ func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduled if platform.ErrorCode(err) == platform.ENotFound { // ErrRunNotFound is expected as part of the RetryRun contract, // so return that actual error instead of a different error that looks like it. - return nil, backend.ErrRunNotFound + return nil, &influxdb.ErrRunNotFound } // RequestStillQueuedError is also part of the contract. diff --git a/http/task_service_test.go b/http/task_service_test.go index f6a7eab6e3..1aa9ad27c4 100644 --- a/http/task_service_test.go +++ b/http/task_service_test.go @@ -802,7 +802,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return &platform.Task{ID: taskID, Organization: "o"}, nil } - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound }, }, method: http.MethodGet, @@ -818,7 +818,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return &platform.Task{ID: taskID, Organization: "o"}, nil } - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound }, }, method: http.MethodPatch, @@ -835,7 +835,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return nil } - return backend.ErrTaskNotFound + return &platform.ErrTaskNotFound }, }, method: http.MethodDelete, @@ -851,7 +851,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return nil, 0, nil } - return nil, 0, backend.ErrTaskNotFound + return nil, 0, &platform.ErrTaskNotFound }, }, method: http.MethodGet, @@ -864,10 +864,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) { if f.Task != taskID { - return nil, 0, backend.ErrTaskNotFound + return nil, 0, &platform.ErrTaskNotFound } if *f.Run != runID { - return nil, 0, backend.ErrNoRunsFound + return nil, 0, &platform.ErrNoRunsFound } return nil, 0, nil @@ -883,7 +883,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) { if f.Task != taskID { - return nil, 0, backend.ErrTaskNotFound + return nil, 0, &platform.ErrTaskNotFound } return nil, 0, nil @@ -899,7 +899,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) { if f.Task != taskID { - return nil, 0, backend.ErrNoRunsFound + return nil, 0, &platform.ErrNoRunsFound } return nil, 0, nil @@ -915,7 +915,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ ForceRunFn: func(_ context.Context, tid platform.ID, _ int64) (*platform.Run, error) { if tid != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil @@ -932,10 +932,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ FindRunByIDFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) { if tid != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } if rid != runID { - return nil, backend.ErrRunNotFound + return nil, &platform.ErrRunNotFound } return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil @@ -951,10 +951,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ RetryRunFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) { if tid != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } if rid != runID { - return nil, backend.ErrRunNotFound + return nil, &platform.ErrRunNotFound } return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil @@ -970,10 +970,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { svc: &mock.TaskService{ CancelRunFn: func(_ context.Context, tid, rid platform.ID) error { if tid != taskID { - return backend.ErrTaskNotFound + return &platform.ErrTaskNotFound } if rid != runID { - return backend.ErrRunNotFound + return &platform.ErrRunNotFound } return nil @@ -1444,7 +1444,7 @@ func TestTaskHandler_Sessions(t *testing.T) { FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) { if id != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Task{ @@ -1536,7 +1536,7 @@ func TestTaskHandler_Sessions(t *testing.T) { FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) { if id != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Task{ @@ -1632,7 +1632,7 @@ func TestTaskHandler_Sessions(t *testing.T) { FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) { if id != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Task{ @@ -1727,7 +1727,7 @@ func TestTaskHandler_Sessions(t *testing.T) { FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) { if id != taskID { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Task{ diff --git a/kv/task.go b/kv/task.go index 5ee1ba79bc..2eef8761f4 100644 --- a/kv/task.go +++ b/kv/task.go @@ -3,7 +3,6 @@ package kv import ( "context" "encoding/json" - "fmt" "strings" "time" @@ -14,84 +13,6 @@ import ( cron "gopkg.in/robfig/cron.v2" ) -var ( - // ErrInvalidTaskID error object for bad id's - ErrInvalidTaskID = &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "invalid id", - } - - // ErrTaskNotFound error when task cant be found - ErrTaskNotFound = &influxdb.Error{ - Code: influxdb.ENotFound, - Msg: "task not found", - } - - // ErrRunNotFound error when run cant be found - ErrRunNotFound = &influxdb.Error{ - Code: influxdb.ENotFound, - Msg: "run not found", - } - - ErrPageSizeTooSmall = &influxdb.Error{ - Msg: "cannot have negative page limit", - Code: influxdb.EInvalid, - } - - ErrPageSizeTooLarge = &influxdb.Error{ - Msg: fmt.Sprintf("cannot use page size larger then %d", influxdb.MaxPageSize), - Code: influxdb.EInvalid, - } - - ErrOrgNotFound = &influxdb.Error{ - Msg: "organization not found", - Code: influxdb.ENotFound, - } - - ErrTaskRunAlreadyQueued = &influxdb.Error{ - Msg: "run already queued", - Code: influxdb.EConflict, - } -) - -func ErrInternalTaskServiceError(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err), - Op: "kv/task", - Err: err, - } -} - -// ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket -func ErrUnexpectedTaskBucketErr(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInternal, - Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err), - Op: "kv/taskBucket", - Err: err, - } -} - -// ErrTaskTimeParse an error for time parsing errors -func ErrTaskTimeParse(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err), - Op: "kv/taskCron", - Err: err, - } -} - -func ErrTaskOptionParse(err error) *influxdb.Error { - return &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: fmt.Sprintf("invalid options; Err: %v", err), - Op: "kv/taskOptions", - Err: err, - } -} - // Task Storage Schema // taskBucket: // : task data storage @@ -152,19 +73,19 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf b, err := tx.Bucket(taskBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } v, err := b.Get(taskKey) if IsNotFound(err) { - return nil, backend.ErrTaskNotFound + return nil, &influxdb.ErrTaskNotFound } if err != nil { return nil, err } t := &influxdb.Task{} if err := json.Unmarshal(v, t); err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID) if err != nil { @@ -230,10 +151,10 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt // complain about limits if filter.Limit < 0 { - return nil, 0, ErrPageSizeTooSmall + return nil, 0, &influxdb.ErrPageSizeTooSmall } if filter.Limit > influxdb.TaskMaxPageSize { - return nil, 0, ErrPageSizeTooLarge + return nil, 0, &influxdb.ErrPageSizeTooLarge } if filter.Limit == 0 { filter.Limit = influxdb.TaskDefaultPageSize @@ -262,7 +183,7 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt // findTasksByUser is a subset of the find tasks function. Used for cleanliness func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { if filter.User == nil { - return nil, 0, ErrTaskNotFound + return nil, 0, &influxdb.ErrTaskNotFound } var org *influxdb.Organization var err error @@ -295,10 +216,10 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta for _, m := range maps { task, err := s.findTaskByID(ctx, tx, m.ResourceID) - if err != nil && err == backend.ErrTaskNotFound { + if err != nil && err == &influxdb.ErrTaskNotFound { return nil, 0, err } - if err == backend.ErrTaskNotFound { + if err == &influxdb.ErrTaskNotFound { continue } @@ -332,19 +253,19 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task } if org == nil { - return nil, 0, ErrTaskNotFound + return nil, 0, &influxdb.ErrTaskNotFound } var ts []*influxdb.Task indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := indexBucket.Cursor() if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } // we can filter by orgID if filter.After != nil { @@ -360,17 +281,17 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task // orgID key, err := org.ID.Encode() if err != nil { - return nil, 0, ErrInvalidTaskID + return nil, 0, &influxdb.ErrInvalidTaskID } k, v := c.Seek(key) if k != nil { id, err := influxdb.IDFromString(string(v)) if err != nil { - return nil, 0, ErrInvalidTaskID + return nil, 0, &influxdb.ErrInvalidTaskID } t, err := s.findTaskByID(ctx, tx, *id) - if err != nil && err != backend.ErrTaskNotFound { + if err != nil && err != &influxdb.ErrTaskNotFound { // we might have some crufty index's return nil, 0, err } @@ -396,12 +317,12 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task id, err := influxdb.IDFromString(string(v)) if err != nil { - return nil, 0, ErrInvalidTaskID + return nil, 0, &influxdb.ErrInvalidTaskID } t, err := s.findTaskByID(ctx, tx, *id) if err != nil { - if err == backend.ErrTaskNotFound { + if err == &influxdb.ErrTaskNotFound { // we might have some crufty index's continue } @@ -432,12 +353,12 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF taskBucket, err := tx.Bucket(taskBucket) if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := taskBucket.Cursor() if err != nil { - return nil, 0, ErrUnexpectedTaskBucketErr(err) + return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err) } // we can filter by orgID if filter.After != nil { @@ -457,7 +378,7 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF t := &influxdb.Task{} if err := json.Unmarshal(v, t); err != nil { - return nil, 0, ErrInternalTaskServiceError(err) + return nil, 0, influxdb.ErrInternalTaskServiceError(err) } latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID) if err != nil { @@ -484,7 +405,7 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF } t := &influxdb.Task{} if err := json.Unmarshal(v, t); err != nil { - return nil, 0, ErrInternalTaskServiceError(err) + return nil, 0, influxdb.ErrInternalTaskServiceError(err) } latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID) if err != nil { @@ -557,12 +478,12 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) } } if org == nil { - return nil, ErrOrgNotFound + return nil, &influxdb.ErrOrgNotFound } opt, err := options.FromScript(tc.Flux) if err != nil { - return nil, ErrTaskOptionParse(err) + return nil, influxdb.ErrTaskOptionParse(err) } if tc.Status == "" { @@ -590,17 +511,17 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) taskBucket, err := tx.Bucket(taskBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } taskBytes, err := json.Marshal(task) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } taskKey, err := taskKey(task.ID) @@ -616,13 +537,13 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) // write the task err = taskBucket.Put(taskKey, taskBytes) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } // write the org index err = indexBucket.Put(orgKey, taskKey) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{ ResourceType: influxdb.TasksResourceType, @@ -670,7 +591,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf options, err := options.FromScript(*upd.Flux) if err != nil { - return nil, ErrTaskOptionParse(err) + return nil, influxdb.ErrTaskOptionParse(err) } task.Name = options.Name task.Every = options.Every.String() @@ -705,7 +626,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf // save the updated task bucket, err := tx.Bucket(taskBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskKey(id) if err != nil { @@ -714,7 +635,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf taskBytes, err := json.Marshal(task) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } return task, bucket.Put(key, taskBytes) @@ -739,17 +660,17 @@ func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error { func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { taskBucket, err := tx.Bucket(taskBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } runBucket, err := tx.Bucket(taskRunBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } indexBucket, err := tx.Bucket(taskIndexBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } // retrieve the task @@ -765,7 +686,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { } if err := indexBucket.Delete(orgKey); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove latest completed @@ -775,7 +696,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { } if err := runBucket.Delete(lastCompletedKey); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } // remove the runs @@ -791,7 +712,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { } if err := runBucket.Delete(key); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } } // remove the task @@ -801,7 +722,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error { } if err := taskBucket.Delete(key); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } return s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{ @@ -878,7 +799,7 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter } if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize { - return nil, 0, backend.ErrOutOfBoundsLimit + return nil, 0, &influxdb.ErrOutOfBoundsLimit } var runs []*influxdb.Run @@ -930,7 +851,7 @@ func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (* func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskRunKey(taskID, runID) @@ -940,14 +861,14 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb runBytes, err := bucket.Get(key) if err != nil { if IsNotFound(err) { - return nil, ErrRunNotFound + return nil, &influxdb.ErrRunNotFound } - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } run := &influxdb.Run{} err = json.Unmarshal(runBytes, run) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } return run, nil @@ -978,12 +899,12 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I // save bucket, err := tx.Bucket(taskBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { - return ErrInternalTaskServiceError(err) + return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, runID) @@ -992,7 +913,7 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I } if err := bucket.Put(runKey, runBytes); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil @@ -1028,7 +949,7 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) @@ -1040,15 +961,15 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID runsBytes, err := bucket.Get(key) if err != nil { if err != ErrKeyNotFound { - return nil, ErrRunNotFound + return nil, &influxdb.ErrRunNotFound } - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if runsBytes != nil { if err := json.Unmarshal(runsBytes, &runs); err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } } @@ -1057,11 +978,11 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID // save manual runs runsBytes, err = json.Marshal(runs) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } if err := bucket.Put(key, runsBytes); err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil @@ -1097,7 +1018,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched // add a clean copy of the run to the manual runs bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } runs, err := s.manualRuns(ctx, tx, taskID) @@ -1108,7 +1029,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched // check to see if this run is already queued for _, run := range runs { if run.ScheduledFor == r.ScheduledFor { - return nil, ErrTaskRunAlreadyQueued + return nil, &influxdb.ErrTaskRunAlreadyQueued } } runs = append(runs, r) @@ -1116,7 +1037,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched // save manual runs runsBytes, err := json.Marshal(runs) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } key, err := taskManualRunKey(taskID) @@ -1125,7 +1046,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched } if err := bucket.Put(key, runsBytes); err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil @@ -1165,11 +1086,11 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, // save manual runs b, err := tx.Bucket(taskRunBucket) if err != nil { - return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err) + return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } mRunsBytes, err := json.Marshal(mRuns) if err != nil { - return backend.RunCreation{}, ErrInternalTaskServiceError(err) + return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runsKey, err := taskManualRunKey(taskID) @@ -1178,12 +1099,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, } if err := b.Put(runsKey, mRunsBytes); err != nil { - return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err) + return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } // add mRun to the list of currently running mRunBytes, err := json.Marshal(mRun) if err != nil { - return backend.RunCreation{}, ErrInternalTaskServiceError(err) + return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, mRun.ID) @@ -1192,7 +1113,7 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, } if err := b.Put(runKey, mRunBytes); err != nil { - return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err) + return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } // return mRun @@ -1231,7 +1152,7 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, // the earliest it could have been completed is "created at" latestCompleted, err := time.Parse(time.RFC3339, task.CreatedAt) if err != nil { - return backend.RunCreation{}, ErrTaskTimeParse(err) + return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } // we could have a latest completed newer then the created at time. @@ -1281,21 +1202,21 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, // create a run if possible sch, err := cron.Parse(task.EffectiveCron()) if err != nil { - return backend.RunCreation{}, ErrTaskTimeParse(err) + return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } nowTime := time.Unix(now, 0) nextScheduled := sch.Next(latestCompleted).UTC() nextScheduledUnix := nextScheduled.Unix() offset := &options.Duration{} if err := offset.Parse(task.Offset); err != nil { - return backend.RunCreation{}, ErrTaskTimeParse(err) + return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } dueAt, err := offset.Add(nextScheduled) if err != nil { - return backend.RunCreation{}, ErrTaskTimeParse(err) + return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } if dueAt.After(nowTime) { - return backend.RunCreation{}, backend.RunNotYetDueError{DueAt: dueAt.Unix()} + return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt.Unix()) } id := s.IDGenerator.ID() @@ -1309,12 +1230,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, } b, err := tx.Bucket(taskRunBucket) if err != nil { - return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err) + return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { - return backend.RunCreation{}, ErrInternalTaskServiceError(err) + return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) @@ -1322,12 +1243,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, return backend.RunCreation{}, err } if err := b.Put(runKey, runBytes); err != nil { - return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err) + return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err) } nextDue, err := offset.Add(sch.Next(nextScheduled)) if err != nil { - return backend.RunCreation{}, ErrTaskTimeParse(err) + return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } // populate RunCreation return backend.RunCreation{ @@ -1362,12 +1283,12 @@ func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]* func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } c, err := bucket.Cursor() if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } var runs []*influxdb.Run @@ -1387,7 +1308,7 @@ func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.I } r := &influxdb.Run{} if err := json.Unmarshal(v, r); err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } // if the run no longer belongs to the task we are done @@ -1420,7 +1341,7 @@ func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influx func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) { b, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskManualRunKey(taskID) if err != nil { @@ -1433,10 +1354,10 @@ func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([] if err == ErrKeyNotFound { return runs, nil } - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if err := json.Unmarshal(val, &runs); err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } return runs, nil @@ -1481,13 +1402,13 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I } bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } if rTime.After(lTime) { rb, err := json.Marshal(r) if err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } lKey, err := taskLatestCompletedKey(taskID) if err != nil { @@ -1495,7 +1416,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I } if err := bucket.Put(lKey, rb); err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } } @@ -1505,7 +1426,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I return nil, err } if err := bucket.Delete(key); err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } return r, nil @@ -1544,7 +1465,7 @@ func (s *Service) nextDueRun(ctx context.Context, tx Tx, taskID influxdb.ID) (in // create a run if possible sch, err := cron.Parse(task.EffectiveCron()) if err != nil { - return 0, ErrTaskTimeParse(err) + return 0, influxdb.ErrTaskTimeParse(err) } nextScheduled := sch.Next(latestCompleted).UTC() @@ -1582,12 +1503,12 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ // save run b, err := tx.Bucket(taskRunBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { - return ErrInternalTaskServiceError(err) + return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) @@ -1595,7 +1516,7 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ return err } if err := b.Put(runKey, runBytes); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil @@ -1625,12 +1546,12 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I // save run b, err := tx.Bucket(taskRunBucket) if err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } runBytes, err := json.Marshal(run) if err != nil { - return ErrInternalTaskServiceError(err) + return influxdb.ErrInternalTaskServiceError(err) } runKey, err := taskRunKey(taskID, run.ID) @@ -1639,7 +1560,7 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I } if err := b.Put(runKey, runBytes); err != nil { - return ErrUnexpectedTaskBucketErr(err) + return influxdb.ErrUnexpectedTaskBucketErr(err) } return nil @@ -1648,7 +1569,7 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Run, error) { bucket, err := tx.Bucket(taskRunBucket) if err != nil { - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } key, err := taskLatestCompletedKey(id) if err != nil { @@ -1660,12 +1581,12 @@ func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID if err == ErrKeyNotFound { return nil, nil } - return nil, ErrUnexpectedTaskBucketErr(err) + return nil, influxdb.ErrUnexpectedTaskBucketErr(err) } run := &influxdb.Run{} if err = json.Unmarshal(bytes, run); err != nil { - return nil, ErrInternalTaskServiceError(err) + return nil, influxdb.ErrInternalTaskServiceError(err) } return run, nil @@ -1686,7 +1607,7 @@ func (s *Service) findLatestCompletedTime(ctx context.Context, tx Tx, id influxd func taskKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } return encodedID, nil } @@ -1694,7 +1615,7 @@ func taskKey(taskID influxdb.ID) ([]byte, error) { func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/latestCompleted"), nil } @@ -1702,7 +1623,7 @@ func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) { func taskManualRunKey(taskID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/manualRuns"), nil } @@ -1710,11 +1631,11 @@ func taskManualRunKey(taskID influxdb.ID) ([]byte, error) { func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) { encodedOrgID, err := orgID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } encodedID, err := taskID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil @@ -1723,11 +1644,11 @@ func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) { func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) { encodedID, err := taskID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } encodedRunID, err := runID.Encode() if err != nil { - return nil, ErrInvalidTaskID + return nil, &influxdb.ErrInvalidTaskID } return []byte(string(encodedID) + "/" + string(encodedRunID)), nil diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index 0c22508168..38f84daa79 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "time" @@ -35,20 +34,6 @@ const ( taskSystemBucketID platform.ID = 10 ) -var ( - // ErrTaskNotFound indicates no task could be found for given parameters. - ErrTaskNotFound = errors.New("task not found") - - // ErrRunNotFound is returned when searching for a single run that doesn't exist. - ErrRunNotFound = errors.New("run not found") -) - -// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit. -var ErrOutOfBoundsLimit = &platform.Error{ - Code: platform.EUnprocessableEntity, - Msg: "run limit is out of bounds, must be between 1 and 500", -} - // NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage { return &AnalyticalStorage{ @@ -157,7 +142,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi } if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize { - return nil, 0, ErrOutOfBoundsLimit + return nil, 0, &influxdb.ErrOutOfBoundsLimit } runs, n, err := as.TaskService.FindRuns(ctx, filter) @@ -270,7 +255,7 @@ func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID infl } if len(re.runs) == 0 { - return nil, ErrRunNotFound + return nil, &platform.ErrRunNotFound } diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 51469f71c1..0af680be29 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -121,12 +121,12 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo // If disabling the task, do so before modifying the script. if task.Status != oldTask.Status && task.Status == string(backend.TaskInactive) { - if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed { + if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed { return task, err } } - if err := c.sch.UpdateTask(ctx, task); err != nil && err != backend.ErrTaskNotClaimed { + if err := c.sch.UpdateTask(ctx, task); err != nil && err != &platform.ErrTaskNotClaimed { return task, err } @@ -139,7 +139,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo return task, err } - if err := c.sch.ClaimTask(ctx, task); err != nil && err != backend.ErrTaskAlreadyClaimed { + if err := c.sch.ClaimTask(ctx, task); err != nil && err != &platform.ErrTaskAlreadyClaimed { return task, err } } @@ -148,7 +148,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo } func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error { - if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed { + if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed { return err } diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index e5780a9422..fc68b69162 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -58,7 +58,7 @@ func inmemTaskService() platform.TaskService { defer mu.Unlock() t, ok := tasks[id] if !ok { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } if upd.Flux != nil { t.Flux = *upd.Flux @@ -78,7 +78,7 @@ func inmemTaskService() platform.TaskService { defer mu.Unlock() t, ok := tasks[id] if !ok { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } newt := *t return &newt, nil @@ -100,7 +100,7 @@ func inmemTaskService() platform.TaskService { defer mu.Unlock() t, ok := tasks[id] if !ok { - return nil, backend.ErrTaskNotFound + return nil, &platform.ErrTaskNotFound } return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil @@ -276,7 +276,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) { t.Fatal(err) } - if _, err := ts.FindTaskByID(context.Background(), task.ID); err != backend.ErrTaskNotFound { + if _, err := ts.FindTaskByID(context.Background(), task.ID); err != &platform.ErrTaskNotFound { t.Fatalf("expected deleted task not to be found; got %v", err) } } diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index b87f1614b9..b65a4ec177 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -126,7 +126,7 @@ func (p *syncRunPromise) Wait() (backend.RunResult, error) { } func (p *syncRunPromise) Cancel() { - p.finish(nil, backend.ErrRunCanceled) + p.finish(nil, &influxdb.ErrRunCanceled) } func (p *syncRunPromise) finish(res *runResult, err error) { @@ -312,7 +312,7 @@ func (p *asyncRunPromise) Wait() (backend.RunResult, error) { } func (p *asyncRunPromise) Cancel() { - p.finish(nil, backend.ErrRunCanceled) + p.finish(nil, &influxdb.ErrRunCanceled) } // followQuery waits for the query to become ready and sets p's results. diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 5d37535156..b0e39fbcc3 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -417,7 +417,7 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) { rp.Cancel() res, err := rp.Wait() - if err != backend.ErrRunCanceled { + if err != &platform.ErrRunCanceled { t.Fatalf("expected ErrRunCanceled, got %v", err) } if res != nil { diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 2215f1b1cf..28100a74db 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -197,13 +197,13 @@ func (s *TickScheduler) CancelRun(_ context.Context, taskID, runID platform.ID) defer s.schedulerMu.Unlock() ts, ok := s.taskSchedulers[taskID] if !ok { - return ErrTaskNotFound + return &platform.ErrTaskNotFound } ts.runningMu.Lock() c, ok := ts.running[runID] if !ok { ts.runningMu.Unlock() - return ErrRunNotFound + return &platform.ErrRunNotFound } ts.runningMu.Unlock() if c.CancelFunc != nil { @@ -302,7 +302,7 @@ func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task) _, ok := s.taskSchedulers[task.ID] if ok { - return ErrTaskAlreadyClaimed + return &platform.ErrTaskAlreadyClaimed } s.taskSchedulers[task.ID] = ts @@ -336,7 +336,7 @@ func (s *TickScheduler) UpdateTask(authCtx context.Context, task *platform.Task) ts, ok := s.taskSchedulers[task.ID] if !ok { - return ErrTaskNotClaimed + return &platform.ErrTaskNotClaimed } ts.task = task @@ -390,7 +390,7 @@ func (s *TickScheduler) ReleaseTask(taskID platform.ID) error { t, ok := s.taskSchedulers[taskID] if !ok { - return ErrTaskNotClaimed + return &platform.ErrTaskNotClaimed } t.Cancel() @@ -746,7 +746,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za rr, err := rp.Wait() close(ready) if err != nil { - if err == ErrRunCanceled { + if err == &platform.ErrRunCanceled { r.updateRunState(qr, RunCanceled, runLogger) errMsg = "Waiting for execution result failed, " + errMsg // Move on to the next execution, for a canceled run. diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 7820d35570..3c7e583f15 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -64,7 +64,7 @@ func TestScheduler_Cancelation(t *testing.T) { // check for when we cancel something already canceled time.Sleep(500 * time.Millisecond) - if err = o.CancelRun(context.Background(), task.ID, run.ID); err != backend.ErrRunNotFound { + if err = o.CancelRun(context.Background(), task.ID, run.ID); err != &platform.ErrRunNotFound { t.Fatalf("expected ErrRunNotFound but got %s", err) } } diff --git a/task/backend/task.go b/task/backend/task.go index 74df8493c6..5b745e4b07 100644 --- a/task/backend/task.go +++ b/task/backend/task.go @@ -2,7 +2,6 @@ package backend import ( "context" - "errors" "fmt" "time" @@ -15,7 +14,7 @@ type TaskControlService interface { // CreateNextRun attempts to create a new run. // The new run's ScheduledFor is assigned the earliest possible time according to task's cron, // that is later than any in-progress run and LatestCompleted run. - // If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError. + // If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns an ErrRunNotDueYet. CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) @@ -70,30 +69,6 @@ func (r RunStatus) String() string { panic(fmt.Sprintf("unknown RunStatus: %d", r)) } -var ( - // ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally. - ErrRunCanceled = errors.New("run canceled") - - // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. - ErrTaskNotClaimed = errors.New("task not claimed") - - // ErrNoRunsFound is returned when searching for a range of runs, but none are found. - ErrNoRunsFound = errors.New("no matching runs found") - - // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. - ErrTaskAlreadyClaimed = errors.New("task already claimed") -) - -// RunNotYetDueError is returned from CreateNextRun if a run is not yet due. -type RunNotYetDueError struct { - // DueAt is the unix timestamp of when the next run is due. - DueAt int64 -} - -func (e RunNotYetDueError) Error() string { - return "run not due until " + time.Unix(e.DueAt, 0).UTC().Format(time.RFC3339) -} - // RequestStillQueuedError is returned when attempting to retry a run which has not yet completed. type RequestStillQueuedError struct { // Unix timestamps matching existing request's start and end. diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 4327faea46..7695d71d83 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -62,7 +62,7 @@ func (s *Scheduler) ClaimTask(_ context.Context, task *platform.Task) error { _, ok := s.claims[task.ID] if ok { - return backend.ErrTaskAlreadyClaimed + return &platform.ErrTaskAlreadyClaimed } s.claims[task.ID] = task @@ -80,7 +80,7 @@ func (s *Scheduler) UpdateTask(_ context.Context, task *platform.Task) error { _, ok := s.claims[task.ID] if !ok { - return backend.ErrTaskNotClaimed + return &platform.ErrTaskNotClaimed } s.claims[task.ID] = task @@ -102,7 +102,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error { t, ok := s.claims[taskID] if !ok { - return backend.ErrTaskNotClaimed + return &platform.ErrTaskNotClaimed } if s.releaseChan != nil { s.releaseChan <- t @@ -302,7 +302,7 @@ func (p *RunPromise) Wait() (backend.RunResult, error) { func (p *RunPromise) Cancel() { p.cancelFunc() - p.Finish(nil, backend.ErrRunCanceled) + p.Finish(nil, &platform.ErrRunCanceled) } // Finish unblocks any call to Wait, to return r and err. diff --git a/task/mock/task_control_service.go b/task/mock/task_control_service.go index 3c93c6037d..0bf4096c2c 100644 --- a/task/mock/task_control_service.go +++ b/task/mock/task_control_service.go @@ -139,7 +139,7 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back } } if dueAt := nextScheduledUnix + int64(offset); dueAt > now { - return backend.RunCreation{}, backend.RunNotYetDueError{DueAt: dueAt} + return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt) } runID := idgen.ID() diff --git a/task/options/options.go b/task/options/options.go index b6e0e1e592..0c9cb540a7 100644 --- a/task/options/options.go +++ b/task/options/options.go @@ -2,7 +2,6 @@ package options import ( - "errors" "fmt" "strings" "sync" @@ -234,7 +233,7 @@ func FromScript(script string) (Options, error) { // pull options from the program scope task, ok := scope.Lookup("task") if !ok { - return opt, errors.New("missing required option: 'task'") + return opt, ErrMissingRequiredTaskOption("task") } // check to make sure task is an object if err := checkNature(task.PolyType().Nature(), semantic.Object); err != nil { @@ -247,7 +246,7 @@ func FromScript(script string) (Options, error) { nameVal, ok := optObject.Get(optName) if !ok { - return opt, errors.New("missing name in task options") + return opt, ErrMissingRequiredTaskOption("name") } if err := checkNature(nameVal.PolyType().Nature(), semantic.String); err != nil { @@ -257,11 +256,11 @@ func FromScript(script string) (Options, error) { crVal, cronOK := optObject.Get(optCron) everyVal, everyOK := optObject.Get(optEvery) if cronOK && everyOK { - return opt, errors.New("cannot use both cron and every in task options") + return opt, ErrDuplicateIntervalField } if !cronOK && !everyOK { - return opt, errors.New("cron or every is required") + return opt, ErrMissingRequiredTaskOption("cron or every is required") } if cronOK { @@ -277,14 +276,14 @@ func FromScript(script string) (Options, error) { } dur, ok := durTypes["every"] if !ok || dur == nil { - return opt, errors.New("failed to parse `every` in task") + return opt, ErrParseTaskOptionField("every") } durNode, err := parseSignedDuration(dur.Location().Source) if err != nil { return opt, err } if !ok || durNode == nil { - return opt, errors.New("failed to parse `every` in task") + return opt, ErrParseTaskOptionField("every") } durNode.BaseNode = ast.BaseNode{} opt.Every.Node = *durNode @@ -296,14 +295,14 @@ func FromScript(script string) (Options, error) { } dur, ok := durTypes["offset"] if !ok || dur == nil { - return opt, errors.New("failed to parse `offset` in task") + return opt, ErrParseTaskOptionField("offset") } durNode, err := parseSignedDuration(dur.Location().Source) if err != nil { return opt, err } if !ok || durNode == nil { - return opt, errors.New("failed to parse `offset` in task") + return opt, ErrParseTaskOptionField("offset") } durNode.BaseNode = ast.BaseNode{} opt.Offset = &Duration{} diff --git a/task/options/options_errors.go b/task/options/options_errors.go new file mode 100644 index 0000000000..9ae9bee772 --- /dev/null +++ b/task/options/options_errors.go @@ -0,0 +1,17 @@ +package options + +import ( + "fmt" +) + +func ErrParseTaskOptionField(opt string) error { + return fmt.Errorf("failed to parse field '%s' in task options", opt) +} + +func ErrMissingRequiredTaskOption(opt string) error { + return fmt.Errorf("missing required option: %s", opt) +} + +var ( + ErrDuplicateIntervalField = fmt.Errorf("cannot use both cron and every in task options") +) diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index fc1aadcafc..bfeda72b5b 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -401,8 +401,8 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Task should not be returned. - if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != backend.ErrTaskNotFound { - t.Fatalf("expected %v, got %v", backend.ErrTaskNotFound, err) + if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != &influxdb.ErrTaskNotFound { + t.Fatalf("expected %v, got %v", influxdb.ErrTaskNotFound, err) } } @@ -604,12 +604,12 @@ func testTaskRuns(t *testing.T, sys *System) { // check run filter errors _, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1}) - if err0 != backend.ErrOutOfBoundsLimit { + if err0 != &influxdb.ErrOutOfBoundsLimit { t.Fatalf("failed to error with out of bounds run limit: %d", -1) } _, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1}) - if err1 != backend.ErrOutOfBoundsLimit { + if err1 != &influxdb.ErrOutOfBoundsLimit { t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1) } @@ -685,13 +685,13 @@ func testTaskRuns(t *testing.T, sys *System) { // Look for a run that doesn't exist. _, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64)) if err == nil { - t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err) + t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err) } // look for a taskID that doesn't exist. _, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID) if err == nil { - t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err) + t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err) } foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID) @@ -956,7 +956,7 @@ func testTaskConcurrency(t *testing.T, sys *System) { if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time // This may have errored due to the task being deleted. Check if the task still exists. - if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == backend.ErrTaskNotFound { + if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == &influxdb.ErrTaskNotFound { // It was deleted. Just continue. continue } @@ -1046,12 +1046,12 @@ func testRunStorage(t *testing.T, sys *System) { // check run filter errors _, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1}) - if err0 != backend.ErrOutOfBoundsLimit { + if err0 != &influxdb.ErrOutOfBoundsLimit { t.Fatalf("failed to error with out of bounds run limit: %d", -1) } _, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1}) - if err1 != backend.ErrOutOfBoundsLimit { + if err1 != &influxdb.ErrOutOfBoundsLimit { t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1) } @@ -1153,13 +1153,13 @@ func testRunStorage(t *testing.T, sys *System) { _, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64)) // TODO(lh): use kv.ErrRunNotFound in the future. Our error's are not exact if err == nil { - t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err) + t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err) } // look for a taskID that doesn't exist. _, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID) if err == nil { - t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err) + t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err) } foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID) @@ -1196,7 +1196,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) { // Non-existent ID should return the right error. _, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64)) if !strings.Contains(err.Error(), "run not found") { - t.Errorf("expected retrying run that doesn't exist to return %v, got %v", backend.ErrRunNotFound, err) + t.Errorf("expected retrying run that doesn't exist to return %v, got %v", &influxdb.ErrRunNotFound, err) } requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make a run. diff --git a/task_errors.go b/task_errors.go new file mode 100644 index 0000000000..cee420c301 --- /dev/null +++ b/task_errors.go @@ -0,0 +1,122 @@ +package influxdb + +import ( + "fmt" + "time" +) + +var ( + // ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally. + ErrRunCanceled = Error{ + Code: EInternal, + Msg: "run canceled", + } + + // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. + ErrTaskNotClaimed = Error{ + Code: EConflict, + Msg: "task not claimed", + } + + // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. + ErrTaskAlreadyClaimed = Error{ + Code: EConflict, + Msg: "task already claimed", + } + + // ErrNoRunsFound is returned when searching for a range of runs, but none are found. + ErrNoRunsFound = Error{ + Code: ENotFound, + Msg: "no matching runs found", + } + + // ErrInvalidTaskID error object for bad id's + ErrInvalidTaskID = Error{ + Code: EInvalid, + Msg: "invalid id", + } + + // ErrTaskNotFound indicates no task could be found for given parameters. + ErrTaskNotFound = Error{ + Code: ENotFound, + Msg: "task not found", + } + + // ErrRunNotFound is returned when searching for a single run that doesn't exist. + ErrRunNotFound = Error{ + Code: ENotFound, + Msg: "run not found", + } + + ErrPageSizeTooSmall = Error{ + Msg: "cannot have negative page limit", + Code: EInvalid, + } + + ErrPageSizeTooLarge = Error{ + Msg: fmt.Sprintf("cannot use page size larger then %d", MaxPageSize), + Code: EInvalid, + } + + ErrOrgNotFound = Error{ + Msg: "organization not found", + Code: ENotFound, + } + + ErrTaskRunAlreadyQueued = Error{ + Msg: "run already queued", + Code: EConflict, + } + + // ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit. + ErrOutOfBoundsLimit = Error{ + Code: EUnprocessableEntity, + Msg: "run limit is out of bounds, must be between 1 and 500", + } +) + +func ErrInternalTaskServiceError(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err), + Op: "kv/task", + Err: err, + } +} + +// ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket +func ErrUnexpectedTaskBucketErr(err error) *Error { + return &Error{ + Code: EInternal, + Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err), + Op: "kv/taskBucket", + Err: err, + } +} + +// ErrTaskTimeParse an error for time parsing errors +func ErrTaskTimeParse(err error) *Error { + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err), + Op: "kv/taskCron", + Err: err, + } +} + +func ErrTaskOptionParse(err error) *Error { + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf("invalid options; Err: %v", err), + Op: "kv/taskOptions", + Err: err, + } +} + +// ErrRunNotDueYet is returned from CreateNextRun if a run is not yet due. +func ErrRunNotDueYet(dueAt int64) *Error { + return &Error{ + Code: EInvalid, + Msg: fmt.Sprintf("run not due until: %v", time.Unix(dueAt, 0).UTC().Format(time.RFC3339)), + } +}