chore(tasks): use pointers for task errors (#14343)

pull/14347/head
Alirie Gray 2019-07-15 13:57:51 -07:00 committed by GitHub
parent d45bd1bb83
commit c7f09d6a56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 111 additions and 111 deletions

View File

@ -651,7 +651,7 @@ func (h *TaskHandler) handleUpdateTask(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to update task",
}
if err.Err == &influxdb.ErrTaskNotFound {
if err.Err == influxdb.ErrTaskNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -728,7 +728,7 @@ func (h *TaskHandler) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to delete task",
}
if err.Err == &influxdb.ErrTaskNotFound {
if err.Err == influxdb.ErrTaskNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -805,7 +805,7 @@ func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to find task logs",
}
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound {
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrNoRunsFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -897,7 +897,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to find runs",
}
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound {
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrNoRunsFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -948,7 +948,7 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest
}
if i < 1 || i > influxdb.TaskMaxPageSize {
return nil, &influxdb.ErrOutOfBoundsLimit
return nil, influxdb.ErrOutOfBoundsLimit
}
req.filter.Limit = i
}
@ -1001,7 +1001,7 @@ func (h *TaskHandler) handleForceRun(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to force run",
}
if err.Err == &influxdb.ErrTaskNotFound {
if err.Err == influxdb.ErrTaskNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -1100,7 +1100,7 @@ func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to find run",
}
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound {
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -1206,7 +1206,7 @@ func (h *TaskHandler) handleCancelRun(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to cancel run",
}
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound {
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -1257,7 +1257,7 @@ func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) {
Err: err,
Msg: "failed to retry run",
}
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &platform.ErrRunNotFound {
if err.Err == influxdb.ErrTaskNotFound || err.Err == influxdb.ErrRunNotFound {
err.Code = platform.ENotFound
}
h.HandleHTTPError(ctx, err, w)
@ -1393,7 +1393,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor
// ErrTaskNotFound is expected as part of the FindTaskByID contract,
// so return that actual error instead of a different error that looks like it.
// TODO cleanup backend task service error implementation
return nil, &influxdb.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
return nil, err
}
@ -1646,7 +1646,7 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, &influxdb.ErrOutOfBoundsLimit
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
val.Set("limit", strconv.Itoa(filter.Limit))
@ -1714,7 +1714,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID)
// ErrRunNotFound is expected as part of the FindRunByID contract,
// so return that actual error instead of a different error that looks like it.
// TODO cleanup backend error implementation
return nil, &platform.ErrRunNotFound
return nil, platform.ErrRunNotFound
}
return nil, err
@ -1757,7 +1757,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
// TODO cleanup backend task error implementation
return nil, &platform.ErrRunNotFound
return nil, platform.ErrRunNotFound
}
// RequestStillQueuedError is also part of the contract.
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
@ -1803,7 +1803,7 @@ func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduled
if platform.ErrorCode(err) == platform.ENotFound {
// ErrRunNotFound is expected as part of the RetryRun contract,
// so return that actual error instead of a different error that looks like it.
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
// RequestStillQueuedError is also part of the contract.

View File

@ -807,7 +807,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return &platform.Task{ID: taskID, Organization: "o"}, nil
}
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
},
},
method: http.MethodGet,
@ -823,7 +823,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return &platform.Task{ID: taskID, Organization: "o"}, nil
}
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
},
},
method: http.MethodPatch,
@ -840,7 +840,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return nil
}
return &platform.ErrTaskNotFound
return platform.ErrTaskNotFound
},
},
method: http.MethodDelete,
@ -856,7 +856,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return nil, 0, nil
}
return nil, 0, &platform.ErrTaskNotFound
return nil, 0, platform.ErrTaskNotFound
},
},
method: http.MethodGet,
@ -869,10 +869,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
if f.Task != taskID {
return nil, 0, &platform.ErrTaskNotFound
return nil, 0, platform.ErrTaskNotFound
}
if *f.Run != runID {
return nil, 0, &platform.ErrNoRunsFound
return nil, 0, platform.ErrNoRunsFound
}
return nil, 0, nil
@ -888,7 +888,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
if f.Task != taskID {
return nil, 0, &platform.ErrTaskNotFound
return nil, 0, platform.ErrTaskNotFound
}
return nil, 0, nil
@ -904,7 +904,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
if f.Task != taskID {
return nil, 0, &platform.ErrNoRunsFound
return nil, 0, platform.ErrNoRunsFound
}
return nil, 0, nil
@ -920,7 +920,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
ForceRunFn: func(_ context.Context, tid platform.ID, _ int64) (*platform.Run, error) {
if tid != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
@ -937,10 +937,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
FindRunByIDFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
if tid != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
if rid != runID {
return nil, &platform.ErrRunNotFound
return nil, platform.ErrRunNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
@ -956,10 +956,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
RetryRunFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
if tid != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
if rid != runID {
return nil, &platform.ErrRunNotFound
return nil, platform.ErrRunNotFound
}
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
@ -975,10 +975,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
svc: &mock.TaskService{
CancelRunFn: func(_ context.Context, tid, rid platform.ID) error {
if tid != taskID {
return &platform.ErrTaskNotFound
return platform.ErrTaskNotFound
}
if rid != runID {
return &platform.ErrRunNotFound
return platform.ErrRunNotFound
}
return nil
@ -1450,7 +1450,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
if id != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Task{
@ -1542,7 +1542,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
if id != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Task{
@ -1638,7 +1638,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
if id != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Task{
@ -1733,7 +1733,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
if id != taskID {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Task{

View File

@ -78,7 +78,7 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf
v, err := b.Get(taskKey)
if IsNotFound(err) {
return nil, &influxdb.ErrTaskNotFound
return nil, influxdb.ErrTaskNotFound
}
if err != nil {
return nil, err
@ -151,10 +151,10 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt
// complain about limits
if filter.Limit < 0 {
return nil, 0, &influxdb.ErrPageSizeTooSmall
return nil, 0, influxdb.ErrPageSizeTooSmall
}
if filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, &influxdb.ErrPageSizeTooLarge
return nil, 0, influxdb.ErrPageSizeTooLarge
}
if filter.Limit == 0 {
filter.Limit = influxdb.TaskDefaultPageSize
@ -183,7 +183,7 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt
// findTasksByUser is a subset of the find tasks function. Used for cleanliness
func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
if filter.User == nil {
return nil, 0, &influxdb.ErrTaskNotFound
return nil, 0, influxdb.ErrTaskNotFound
}
var org *influxdb.Organization
var err error
@ -216,10 +216,10 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
for _, m := range maps {
task, err := s.findTaskByID(ctx, tx, m.ResourceID)
if err != nil && err == &influxdb.ErrTaskNotFound {
if err != nil && err == influxdb.ErrTaskNotFound {
return nil, 0, err
}
if err == &influxdb.ErrTaskNotFound {
if err == influxdb.ErrTaskNotFound {
continue
}
@ -253,7 +253,7 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
}
if org == nil {
return nil, 0, &influxdb.ErrTaskNotFound
return nil, 0, influxdb.ErrTaskNotFound
}
var ts []*influxdb.Task
@ -281,17 +281,17 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
// orgID
key, err := org.ID.Encode()
if err != nil {
return nil, 0, &influxdb.ErrInvalidTaskID
return nil, 0, influxdb.ErrInvalidTaskID
}
k, v := c.Seek(key)
if k != nil {
id, err := influxdb.IDFromString(string(v))
if err != nil {
return nil, 0, &influxdb.ErrInvalidTaskID
return nil, 0, influxdb.ErrInvalidTaskID
}
t, err := s.findTaskByID(ctx, tx, *id)
if err != nil && err != &influxdb.ErrTaskNotFound {
if err != nil && err != influxdb.ErrTaskNotFound {
// we might have some crufty index's
return nil, 0, err
}
@ -317,12 +317,12 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
id, err := influxdb.IDFromString(string(v))
if err != nil {
return nil, 0, &influxdb.ErrInvalidTaskID
return nil, 0, influxdb.ErrInvalidTaskID
}
t, err := s.findTaskByID(ctx, tx, *id)
if err != nil {
if err == &influxdb.ErrTaskNotFound {
if err == influxdb.ErrTaskNotFound {
// we might have some crufty index's
continue
}
@ -478,7 +478,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
}
}
if org == nil {
return nil, &influxdb.ErrOrgNotFound
return nil, influxdb.ErrOrgNotFound
}
opt, err := options.FromScript(tc.Flux)
@ -799,7 +799,7 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, &influxdb.ErrOutOfBoundsLimit
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
var runs []*influxdb.Run
@ -861,7 +861,7 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb
runBytes, err := bucket.Get(key)
if err != nil {
if IsNotFound(err) {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
@ -961,7 +961,7 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID
runsBytes, err := bucket.Get(key)
if err != nil {
if err != ErrKeyNotFound {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
@ -1029,7 +1029,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
// check to see if this run is already queued
for _, run := range runs {
if run.ScheduledFor == r.ScheduledFor {
return nil, &influxdb.ErrTaskRunAlreadyQueued
return nil, influxdb.ErrTaskRunAlreadyQueued
}
}
runs = append(runs, r)
@ -1424,11 +1424,11 @@ func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influ
mRuns, err := s.manualRuns(ctx, tx, taskID)
if err != nil {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
if len(mRuns) < 1 {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
var run *influxdb.Run
@ -1439,7 +1439,7 @@ func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influ
}
}
if run == nil {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
// save manual runs
@ -1724,7 +1724,7 @@ func (s *Service) findLatestCompletedTime(ctx context.Context, tx Tx, id influxd
func taskKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
return encodedID, nil
}
@ -1732,7 +1732,7 @@ func taskKey(taskID influxdb.ID) ([]byte, error) {
func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/latestCompleted"), nil
}
@ -1740,7 +1740,7 @@ func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/manualRuns"), nil
}
@ -1748,11 +1748,11 @@ func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
encodedOrgID, err := orgID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
encodedID, err := taskID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil
@ -1761,11 +1761,11 @@ func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
encodedRunID, err := runID.Encode()
if err != nil {
return nil, &influxdb.ErrInvalidTaskID
return nil, influxdb.ErrInvalidTaskID
}
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil

View File

@ -151,7 +151,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
}
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
return nil, 0, &influxdb.ErrOutOfBoundsLimit
return nil, 0, influxdb.ErrOutOfBoundsLimit
}
runs, n, err := as.TaskService.FindRuns(ctx, filter)
@ -264,7 +264,7 @@ func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID infl
}
if len(re.runs) == 0 {
return nil, &platform.ErrRunNotFound
return nil, platform.ErrRunNotFound
}

View File

@ -121,12 +121,12 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
// If disabling the task, do so before modifying the script.
if task.Status != oldTask.Status && task.Status == string(backend.TaskInactive) {
if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed {
if err := c.sch.ReleaseTask(id); err != nil && err != platform.ErrTaskNotClaimed {
return task, err
}
}
if err := c.sch.UpdateTask(ctx, task); err != nil && err != &platform.ErrTaskNotClaimed {
if err := c.sch.UpdateTask(ctx, task); err != nil && err != platform.ErrTaskNotClaimed {
return task, err
}
@ -139,7 +139,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
return task, err
}
if err := c.sch.ClaimTask(ctx, task); err != nil && err != &platform.ErrTaskAlreadyClaimed {
if err := c.sch.ClaimTask(ctx, task); err != nil && err != platform.ErrTaskAlreadyClaimed {
return task, err
}
}
@ -148,7 +148,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
}
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error {
if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed {
if err := c.sch.ReleaseTask(id); err != nil && err != platform.ErrTaskNotClaimed {
return err
}

View File

@ -58,7 +58,7 @@ func inmemTaskService() platform.TaskService {
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
if upd.Flux != nil {
t.Flux = *upd.Flux
@ -78,7 +78,7 @@ func inmemTaskService() platform.TaskService {
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
newt := *t
return &newt, nil
@ -100,7 +100,7 @@ func inmemTaskService() platform.TaskService {
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, &platform.ErrTaskNotFound
return nil, platform.ErrTaskNotFound
}
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
@ -276,7 +276,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
t.Fatal(err)
}
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != &platform.ErrTaskNotFound {
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != platform.ErrTaskNotFound {
t.Fatalf("expected deleted task not to be found; got %v", err)
}
}

View File

@ -126,7 +126,7 @@ func (p *syncRunPromise) Wait() (backend.RunResult, error) {
}
func (p *syncRunPromise) Cancel() {
p.finish(nil, &influxdb.ErrRunCanceled)
p.finish(nil, influxdb.ErrRunCanceled)
}
func (p *syncRunPromise) finish(res *runResult, err error) {
@ -316,7 +316,7 @@ func (p *asyncRunPromise) Wait() (backend.RunResult, error) {
}
func (p *asyncRunPromise) Cancel() {
p.finish(nil, &influxdb.ErrRunCanceled)
p.finish(nil, influxdb.ErrRunCanceled)
}
// followQuery waits for the query to become ready and sets p's results.

View File

@ -441,7 +441,7 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) {
rp.Cancel()
res, err := rp.Wait()
if err != &platform.ErrRunCanceled {
if err != platform.ErrRunCanceled {
t.Fatalf("expected ErrRunCanceled, got %v", err)
}
if res != nil {

View File

@ -161,7 +161,7 @@ func (e *TaskExecutor) startManualRun(ctx context.Context, id influxdb.ID, sched
return e.createPromise(ctx, r)
}
}
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
func (e *TaskExecutor) resumeRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*Promise, error) {
@ -179,7 +179,7 @@ func (e *TaskExecutor) resumeRun(ctx context.Context, id influxdb.ID, scheduledA
return e.createPromise(ctx, run)
}
}
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*Promise, error) {
@ -290,7 +290,7 @@ func (w *worker) work() {
case <-prom.ctx.Done():
w.te.tcs.AddRunLog(prom.ctx, prom.task.ID, prom.run.ID, time.Now(), "Run canceled")
w.te.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now(), backend.RunCanceled)
prom.err = &influxdb.ErrRunCanceled
prom.err = influxdb.ErrRunCanceled
close(prom.done)
return
case <-time.After(time.Second):

View File

@ -198,13 +198,13 @@ func (s *TickScheduler) CancelRun(_ context.Context, taskID, runID platform.ID)
defer s.schedulerMu.Unlock()
ts, ok := s.taskSchedulers[taskID]
if !ok {
return &platform.ErrTaskNotFound
return platform.ErrTaskNotFound
}
ts.runningMu.Lock()
c, ok := ts.running[runID]
if !ok {
ts.runningMu.Unlock()
return &platform.ErrRunNotFound
return platform.ErrRunNotFound
}
ts.runningMu.Unlock()
if c.CancelFunc != nil {
@ -303,7 +303,7 @@ func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task)
_, ok := s.taskSchedulers[task.ID]
if ok {
err = &platform.ErrTaskAlreadyClaimed
err = platform.ErrTaskAlreadyClaimed
return err
}
@ -338,7 +338,7 @@ func (s *TickScheduler) UpdateTask(authCtx context.Context, task *platform.Task)
ts, ok := s.taskSchedulers[task.ID]
if !ok {
return &platform.ErrTaskNotClaimed
return platform.ErrTaskNotClaimed
}
ts.task = task
@ -392,7 +392,7 @@ func (s *TickScheduler) ReleaseTask(taskID platform.ID) error {
t, ok := s.taskSchedulers[taskID]
if !ok {
return &platform.ErrTaskNotClaimed
return platform.ErrTaskNotClaimed
}
t.Cancel()
@ -748,7 +748,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
rr, err := rp.Wait()
close(ready)
if err != nil {
if err == &platform.ErrRunCanceled {
if err == platform.ErrRunCanceled {
r.updateRunState(qr, RunCanceled, runLogger)
errMsg = "Waiting for execution result failed, " + errMsg
// Move on to the next execution, for a canceled run.

View File

@ -64,7 +64,7 @@ func TestScheduler_Cancelation(t *testing.T) {
// check for when we cancel something already canceled
time.Sleep(500 * time.Millisecond)
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != &platform.ErrRunNotFound {
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != platform.ErrRunNotFound {
t.Fatalf("expected ErrRunNotFound but got %s", err)
}
}

View File

@ -62,7 +62,7 @@ func (s *Scheduler) ClaimTask(_ context.Context, task *platform.Task) error {
_, ok := s.claims[task.ID]
if ok {
return &platform.ErrTaskAlreadyClaimed
return platform.ErrTaskAlreadyClaimed
}
s.claims[task.ID] = task
@ -80,7 +80,7 @@ func (s *Scheduler) UpdateTask(_ context.Context, task *platform.Task) error {
_, ok := s.claims[task.ID]
if !ok {
return &platform.ErrTaskNotClaimed
return platform.ErrTaskNotClaimed
}
s.claims[task.ID] = task
@ -102,7 +102,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
t, ok := s.claims[taskID]
if !ok {
return &platform.ErrTaskNotClaimed
return platform.ErrTaskNotClaimed
}
if s.releaseChan != nil {
s.releaseChan <- t
@ -302,7 +302,7 @@ func (p *RunPromise) Wait() (backend.RunResult, error) {
func (p *RunPromise) Cancel() {
p.cancelFunc()
p.Finish(nil, &platform.ErrRunCanceled)
p.Finish(nil, platform.ErrRunCanceled)
}
// Finish unblocks any call to Wait, to return r and err.

View File

@ -192,7 +192,7 @@ func (t *TaskControlService) StartManualRun(_ context.Context, taskID, runID inf
}
}
if run == nil {
return nil, &influxdb.ErrRunNotFound
return nil, influxdb.ErrRunNotFound
}
return run, nil
}

View File

@ -396,7 +396,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Task should not be returned.
if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != &influxdb.ErrTaskNotFound {
if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != influxdb.ErrTaskNotFound {
t.Fatalf("expected %v, got %v", influxdb.ErrTaskNotFound, err)
}
}
@ -599,12 +599,12 @@ func testTaskRuns(t *testing.T, sys *System) {
// check run filter errors
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
if err0 != &influxdb.ErrOutOfBoundsLimit {
if err0 != influxdb.ErrOutOfBoundsLimit {
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
}
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
if err1 != &influxdb.ErrOutOfBoundsLimit {
if err1 != influxdb.ErrOutOfBoundsLimit {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}
@ -680,13 +680,13 @@ func testTaskRuns(t *testing.T, sys *System) {
// Look for a run that doesn't exist.
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
if err == nil {
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
t.Fatalf("expected %s but got %s instead", influxdb.ErrRunNotFound, err)
}
// look for a taskID that doesn't exist.
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
if err == nil {
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
t.Fatalf("expected %s but got %s instead", influxdb.ErrRunNotFound, err)
}
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
@ -951,7 +951,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time
// This may have errored due to the task being deleted. Check if the task still exists.
if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == &influxdb.ErrTaskNotFound {
if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == influxdb.ErrTaskNotFound {
// It was deleted. Just continue.
continue
}
@ -1041,12 +1041,12 @@ func testRunStorage(t *testing.T, sys *System) {
// check run filter errors
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
if err0 != &influxdb.ErrOutOfBoundsLimit {
if err0 != influxdb.ErrOutOfBoundsLimit {
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
}
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
if err1 != &influxdb.ErrOutOfBoundsLimit {
if err1 != influxdb.ErrOutOfBoundsLimit {
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
}
@ -1148,13 +1148,13 @@ func testRunStorage(t *testing.T, sys *System) {
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
// TODO(lh): use kv.ErrRunNotFound in the future. Our error's are not exact
if err == nil {
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
t.Fatalf("expected %s but got %s instead", influxdb.ErrRunNotFound, err)
}
// look for a taskID that doesn't exist.
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
if err == nil {
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
t.Fatalf("expected %s but got %s instead", influxdb.ErrRunNotFound, err)
}
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
@ -1191,7 +1191,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
// Non-existent ID should return the right error.
_, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
if !strings.Contains(err.Error(), "run not found") {
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", &influxdb.ErrRunNotFound, err)
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", influxdb.ErrRunNotFound, err)
}
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make a run.

View File

@ -7,69 +7,69 @@ import (
var (
// ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally.
ErrRunCanceled = Error{
ErrRunCanceled = &Error{
Code: EInternal,
Msg: "run canceled",
}
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
ErrTaskNotClaimed = Error{
ErrTaskNotClaimed = &Error{
Code: EConflict,
Msg: "task not claimed",
}
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
ErrTaskAlreadyClaimed = Error{
ErrTaskAlreadyClaimed = &Error{
Code: EConflict,
Msg: "task already claimed",
}
// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
ErrNoRunsFound = Error{
ErrNoRunsFound = &Error{
Code: ENotFound,
Msg: "no matching runs found",
}
// ErrInvalidTaskID error object for bad id's
ErrInvalidTaskID = Error{
ErrInvalidTaskID = &Error{
Code: EInvalid,
Msg: "invalid id",
}
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = Error{
ErrTaskNotFound = &Error{
Code: ENotFound,
Msg: "task not found",
}
// ErrRunNotFound is returned when searching for a single run that doesn't exist.
ErrRunNotFound = Error{
ErrRunNotFound = &Error{
Code: ENotFound,
Msg: "run not found",
}
ErrPageSizeTooSmall = Error{
ErrPageSizeTooSmall = &Error{
Msg: "cannot have negative page limit",
Code: EInvalid,
}
ErrPageSizeTooLarge = Error{
ErrPageSizeTooLarge = &Error{
Msg: fmt.Sprintf("cannot use page size larger then %d", MaxPageSize),
Code: EInvalid,
}
ErrOrgNotFound = Error{
ErrOrgNotFound = &Error{
Msg: "organization not found",
Code: ENotFound,
}
ErrTaskRunAlreadyQueued = Error{
ErrTaskRunAlreadyQueued = &Error{
Msg: "run already queued",
Code: EConflict,
}
// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit.
ErrOutOfBoundsLimit = Error{
ErrOutOfBoundsLimit = &Error{
Code: EUnprocessableEntity,
Msg: "run limit is out of bounds, must be between 1 and 500",
}