chore(tasks): consolidate task errors into task_errors.go
parent
69542b1752
commit
66157c9d44
|
@ -644,7 +644,7 @@ func (h *TaskHandler) handleUpdateTask(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to update task",
|
Msg: "failed to update task",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -721,7 +721,7 @@ func (h *TaskHandler) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to delete task",
|
Msg: "failed to delete task",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -798,7 +798,7 @@ func (h *TaskHandler) handleGetLogs(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to find task logs",
|
Msg: "failed to find task logs",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrNoRunsFound {
|
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -890,7 +890,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to find runs",
|
Msg: "failed to find runs",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrNoRunsFound {
|
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrNoRunsFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -941,7 +941,7 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest
|
||||||
}
|
}
|
||||||
|
|
||||||
if i < 1 || i > influxdb.TaskMaxPageSize {
|
if i < 1 || i > influxdb.TaskMaxPageSize {
|
||||||
return nil, backend.ErrOutOfBoundsLimit
|
return nil, &influxdb.ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
req.filter.Limit = i
|
req.filter.Limit = i
|
||||||
}
|
}
|
||||||
|
@ -994,7 +994,7 @@ func (h *TaskHandler) handleForceRun(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to force run",
|
Msg: "failed to force run",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -1093,7 +1093,7 @@ func (h *TaskHandler) handleGetRun(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to find run",
|
Msg: "failed to find run",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -1199,7 +1199,7 @@ func (h *TaskHandler) handleCancelRun(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to cancel run",
|
Msg: "failed to cancel run",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &influxdb.ErrRunNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -1250,7 +1250,7 @@ func (h *TaskHandler) handleRetryRun(w http.ResponseWriter, r *http.Request) {
|
||||||
Err: err,
|
Err: err,
|
||||||
Msg: "failed to retry run",
|
Msg: "failed to retry run",
|
||||||
}
|
}
|
||||||
if err.Err == backend.ErrTaskNotFound || err.Err == backend.ErrRunNotFound {
|
if err.Err == &influxdb.ErrTaskNotFound || err.Err == &platform.ErrRunNotFound {
|
||||||
err.Code = platform.ENotFound
|
err.Code = platform.ENotFound
|
||||||
}
|
}
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
|
@ -1387,7 +1387,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id platform.ID) (*platfor
|
||||||
// ErrTaskNotFound is expected as part of the FindTaskByID contract,
|
// ErrTaskNotFound is expected as part of the FindTaskByID contract,
|
||||||
// so return that actual error instead of a different error that looks like it.
|
// so return that actual error instead of a different error that looks like it.
|
||||||
// TODO cleanup backend task service error implementation
|
// TODO cleanup backend task service error implementation
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &influxdb.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1645,7 +1645,7 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([
|
||||||
}
|
}
|
||||||
|
|
||||||
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
return nil, 0, backend.ErrOutOfBoundsLimit
|
return nil, 0, &influxdb.ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
val.Set("limit", strconv.Itoa(filter.Limit))
|
val.Set("limit", strconv.Itoa(filter.Limit))
|
||||||
|
|
||||||
|
@ -1715,7 +1715,7 @@ func (t TaskService) FindRunByID(ctx context.Context, taskID, runID platform.ID)
|
||||||
// ErrRunNotFound is expected as part of the FindRunByID contract,
|
// ErrRunNotFound is expected as part of the FindRunByID contract,
|
||||||
// so return that actual error instead of a different error that looks like it.
|
// so return that actual error instead of a different error that looks like it.
|
||||||
// TODO cleanup backend error implementation
|
// TODO cleanup backend error implementation
|
||||||
return nil, backend.ErrRunNotFound
|
return nil, &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1759,7 +1759,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID platform.ID) (*
|
||||||
// ErrRunNotFound is expected as part of the RetryRun contract,
|
// ErrRunNotFound is expected as part of the RetryRun contract,
|
||||||
// so return that actual error instead of a different error that looks like it.
|
// so return that actual error instead of a different error that looks like it.
|
||||||
// TODO cleanup backend task error implementation
|
// TODO cleanup backend task error implementation
|
||||||
return nil, backend.ErrRunNotFound
|
return nil, &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
// RequestStillQueuedError is also part of the contract.
|
// RequestStillQueuedError is also part of the contract.
|
||||||
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
|
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
|
||||||
|
@ -1806,7 +1806,7 @@ func (t TaskService) ForceRun(ctx context.Context, taskID platform.ID, scheduled
|
||||||
if platform.ErrorCode(err) == platform.ENotFound {
|
if platform.ErrorCode(err) == platform.ENotFound {
|
||||||
// ErrRunNotFound is expected as part of the RetryRun contract,
|
// ErrRunNotFound is expected as part of the RetryRun contract,
|
||||||
// so return that actual error instead of a different error that looks like it.
|
// so return that actual error instead of a different error that looks like it.
|
||||||
return nil, backend.ErrRunNotFound
|
return nil, &influxdb.ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestStillQueuedError is also part of the contract.
|
// RequestStillQueuedError is also part of the contract.
|
||||||
|
|
|
@ -802,7 +802,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
return &platform.Task{ID: taskID, Organization: "o"}, nil
|
return &platform.Task{ID: taskID, Organization: "o"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
method: http.MethodGet,
|
method: http.MethodGet,
|
||||||
|
@ -818,7 +818,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
return &platform.Task{ID: taskID, Organization: "o"}, nil
|
return &platform.Task{ID: taskID, Organization: "o"}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
method: http.MethodPatch,
|
method: http.MethodPatch,
|
||||||
|
@ -835,7 +835,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return backend.ErrTaskNotFound
|
return &platform.ErrTaskNotFound
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
method: http.MethodDelete,
|
method: http.MethodDelete,
|
||||||
|
@ -851,7 +851,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
return nil, 0, nil
|
return nil, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, 0, backend.ErrTaskNotFound
|
return nil, 0, &platform.ErrTaskNotFound
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
method: http.MethodGet,
|
method: http.MethodGet,
|
||||||
|
@ -864,10 +864,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
if f.Task != taskID {
|
if f.Task != taskID {
|
||||||
return nil, 0, backend.ErrTaskNotFound
|
return nil, 0, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if *f.Run != runID {
|
if *f.Run != runID {
|
||||||
return nil, 0, backend.ErrNoRunsFound
|
return nil, 0, &platform.ErrNoRunsFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, 0, nil
|
return nil, 0, nil
|
||||||
|
@ -883,7 +883,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
if f.Task != taskID {
|
if f.Task != taskID {
|
||||||
return nil, 0, backend.ErrTaskNotFound
|
return nil, 0, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, 0, nil
|
return nil, 0, nil
|
||||||
|
@ -899,7 +899,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
if f.Task != taskID {
|
if f.Task != taskID {
|
||||||
return nil, 0, backend.ErrNoRunsFound
|
return nil, 0, &platform.ErrNoRunsFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, 0, nil
|
return nil, 0, nil
|
||||||
|
@ -915,7 +915,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
ForceRunFn: func(_ context.Context, tid platform.ID, _ int64) (*platform.Run, error) {
|
ForceRunFn: func(_ context.Context, tid platform.ID, _ int64) (*platform.Run, error) {
|
||||||
if tid != taskID {
|
if tid != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
||||||
|
@ -932,10 +932,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindRunByIDFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
|
FindRunByIDFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
|
||||||
if tid != taskID {
|
if tid != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if rid != runID {
|
if rid != runID {
|
||||||
return nil, backend.ErrRunNotFound
|
return nil, &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
||||||
|
@ -951,10 +951,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
RetryRunFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
|
RetryRunFn: func(_ context.Context, tid, rid platform.ID) (*platform.Run, error) {
|
||||||
if tid != taskID {
|
if tid != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if rid != runID {
|
if rid != runID {
|
||||||
return nil, backend.ErrRunNotFound
|
return nil, &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
return &platform.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
|
||||||
|
@ -970,10 +970,10 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
CancelRunFn: func(_ context.Context, tid, rid platform.ID) error {
|
CancelRunFn: func(_ context.Context, tid, rid platform.ID) error {
|
||||||
if tid != taskID {
|
if tid != taskID {
|
||||||
return backend.ErrTaskNotFound
|
return &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if rid != runID {
|
if rid != runID {
|
||||||
return backend.ErrRunNotFound
|
return &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1444,7 +1444,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
|
||||||
|
|
||||||
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
if id != taskID {
|
if id != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Task{
|
return &platform.Task{
|
||||||
|
@ -1536,7 +1536,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
|
||||||
|
|
||||||
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
if id != taskID {
|
if id != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Task{
|
return &platform.Task{
|
||||||
|
@ -1632,7 +1632,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
|
||||||
|
|
||||||
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
if id != taskID {
|
if id != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Task{
|
return &platform.Task{
|
||||||
|
@ -1727,7 +1727,7 @@ func TestTaskHandler_Sessions(t *testing.T) {
|
||||||
|
|
||||||
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
if id != taskID {
|
if id != taskID {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Task{
|
return &platform.Task{
|
||||||
|
|
271
kv/task.go
271
kv/task.go
|
@ -3,7 +3,6 @@ package kv
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -14,84 +13,6 @@ import (
|
||||||
cron "gopkg.in/robfig/cron.v2"
|
cron "gopkg.in/robfig/cron.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrInvalidTaskID error object for bad id's
|
|
||||||
ErrInvalidTaskID = &influxdb.Error{
|
|
||||||
Code: influxdb.EInvalid,
|
|
||||||
Msg: "invalid id",
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrTaskNotFound error when task cant be found
|
|
||||||
ErrTaskNotFound = &influxdb.Error{
|
|
||||||
Code: influxdb.ENotFound,
|
|
||||||
Msg: "task not found",
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrRunNotFound error when run cant be found
|
|
||||||
ErrRunNotFound = &influxdb.Error{
|
|
||||||
Code: influxdb.ENotFound,
|
|
||||||
Msg: "run not found",
|
|
||||||
}
|
|
||||||
|
|
||||||
ErrPageSizeTooSmall = &influxdb.Error{
|
|
||||||
Msg: "cannot have negative page limit",
|
|
||||||
Code: influxdb.EInvalid,
|
|
||||||
}
|
|
||||||
|
|
||||||
ErrPageSizeTooLarge = &influxdb.Error{
|
|
||||||
Msg: fmt.Sprintf("cannot use page size larger then %d", influxdb.MaxPageSize),
|
|
||||||
Code: influxdb.EInvalid,
|
|
||||||
}
|
|
||||||
|
|
||||||
ErrOrgNotFound = &influxdb.Error{
|
|
||||||
Msg: "organization not found",
|
|
||||||
Code: influxdb.ENotFound,
|
|
||||||
}
|
|
||||||
|
|
||||||
ErrTaskRunAlreadyQueued = &influxdb.Error{
|
|
||||||
Msg: "run already queued",
|
|
||||||
Code: influxdb.EConflict,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func ErrInternalTaskServiceError(err error) *influxdb.Error {
|
|
||||||
return &influxdb.Error{
|
|
||||||
Code: influxdb.EInternal,
|
|
||||||
Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err),
|
|
||||||
Op: "kv/task",
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket
|
|
||||||
func ErrUnexpectedTaskBucketErr(err error) *influxdb.Error {
|
|
||||||
return &influxdb.Error{
|
|
||||||
Code: influxdb.EInternal,
|
|
||||||
Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err),
|
|
||||||
Op: "kv/taskBucket",
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrTaskTimeParse an error for time parsing errors
|
|
||||||
func ErrTaskTimeParse(err error) *influxdb.Error {
|
|
||||||
return &influxdb.Error{
|
|
||||||
Code: influxdb.EInvalid,
|
|
||||||
Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err),
|
|
||||||
Op: "kv/taskCron",
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ErrTaskOptionParse(err error) *influxdb.Error {
|
|
||||||
return &influxdb.Error{
|
|
||||||
Code: influxdb.EInvalid,
|
|
||||||
Msg: fmt.Sprintf("invalid options; Err: %v", err),
|
|
||||||
Op: "kv/taskOptions",
|
|
||||||
Err: err,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Task Storage Schema
|
// Task Storage Schema
|
||||||
// taskBucket:
|
// taskBucket:
|
||||||
// <taskID>: task data storage
|
// <taskID>: task data storage
|
||||||
|
@ -152,19 +73,19 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf
|
||||||
|
|
||||||
b, err := tx.Bucket(taskBucket)
|
b, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := b.Get(taskKey)
|
v, err := b.Get(taskKey)
|
||||||
if IsNotFound(err) {
|
if IsNotFound(err) {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &influxdb.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
t := &influxdb.Task{}
|
t := &influxdb.Task{}
|
||||||
if err := json.Unmarshal(v, t); err != nil {
|
if err := json.Unmarshal(v, t); err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -230,10 +151,10 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt
|
||||||
|
|
||||||
// complain about limits
|
// complain about limits
|
||||||
if filter.Limit < 0 {
|
if filter.Limit < 0 {
|
||||||
return nil, 0, ErrPageSizeTooSmall
|
return nil, 0, &influxdb.ErrPageSizeTooSmall
|
||||||
}
|
}
|
||||||
if filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
return nil, 0, ErrPageSizeTooLarge
|
return nil, 0, &influxdb.ErrPageSizeTooLarge
|
||||||
}
|
}
|
||||||
if filter.Limit == 0 {
|
if filter.Limit == 0 {
|
||||||
filter.Limit = influxdb.TaskDefaultPageSize
|
filter.Limit = influxdb.TaskDefaultPageSize
|
||||||
|
@ -262,7 +183,7 @@ func (s *Service) findTasks(ctx context.Context, tx Tx, filter influxdb.TaskFilt
|
||||||
// findTasksByUser is a subset of the find tasks function. Used for cleanliness
|
// findTasksByUser is a subset of the find tasks function. Used for cleanliness
|
||||||
func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
|
func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
|
||||||
if filter.User == nil {
|
if filter.User == nil {
|
||||||
return nil, 0, ErrTaskNotFound
|
return nil, 0, &influxdb.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
var org *influxdb.Organization
|
var org *influxdb.Organization
|
||||||
var err error
|
var err error
|
||||||
|
@ -295,10 +216,10 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
|
||||||
|
|
||||||
for _, m := range maps {
|
for _, m := range maps {
|
||||||
task, err := s.findTaskByID(ctx, tx, m.ResourceID)
|
task, err := s.findTaskByID(ctx, tx, m.ResourceID)
|
||||||
if err != nil && err == backend.ErrTaskNotFound {
|
if err != nil && err == &influxdb.ErrTaskNotFound {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
if err == backend.ErrTaskNotFound {
|
if err == &influxdb.ErrTaskNotFound {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,19 +253,19 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
|
||||||
}
|
}
|
||||||
|
|
||||||
if org == nil {
|
if org == nil {
|
||||||
return nil, 0, ErrTaskNotFound
|
return nil, 0, &influxdb.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
var ts []*influxdb.Task
|
var ts []*influxdb.Task
|
||||||
|
|
||||||
indexBucket, err := tx.Bucket(taskIndexBucket)
|
indexBucket, err := tx.Bucket(taskIndexBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrUnexpectedTaskBucketErr(err)
|
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := indexBucket.Cursor()
|
c, err := indexBucket.Cursor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrUnexpectedTaskBucketErr(err)
|
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
// we can filter by orgID
|
// we can filter by orgID
|
||||||
if filter.After != nil {
|
if filter.After != nil {
|
||||||
|
@ -360,17 +281,17 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
|
||||||
// orgID
|
// orgID
|
||||||
key, err := org.ID.Encode()
|
key, err := org.ID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrInvalidTaskID
|
return nil, 0, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
k, v := c.Seek(key)
|
k, v := c.Seek(key)
|
||||||
if k != nil {
|
if k != nil {
|
||||||
id, err := influxdb.IDFromString(string(v))
|
id, err := influxdb.IDFromString(string(v))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrInvalidTaskID
|
return nil, 0, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
|
|
||||||
t, err := s.findTaskByID(ctx, tx, *id)
|
t, err := s.findTaskByID(ctx, tx, *id)
|
||||||
if err != nil && err != backend.ErrTaskNotFound {
|
if err != nil && err != &influxdb.ErrTaskNotFound {
|
||||||
// we might have some crufty index's
|
// we might have some crufty index's
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
@ -396,12 +317,12 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
|
||||||
|
|
||||||
id, err := influxdb.IDFromString(string(v))
|
id, err := influxdb.IDFromString(string(v))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrInvalidTaskID
|
return nil, 0, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
|
|
||||||
t, err := s.findTaskByID(ctx, tx, *id)
|
t, err := s.findTaskByID(ctx, tx, *id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == backend.ErrTaskNotFound {
|
if err == &influxdb.ErrTaskNotFound {
|
||||||
// we might have some crufty index's
|
// we might have some crufty index's
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -432,12 +353,12 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
||||||
|
|
||||||
taskBucket, err := tx.Bucket(taskBucket)
|
taskBucket, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrUnexpectedTaskBucketErr(err)
|
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := taskBucket.Cursor()
|
c, err := taskBucket.Cursor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, ErrUnexpectedTaskBucketErr(err)
|
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
// we can filter by orgID
|
// we can filter by orgID
|
||||||
if filter.After != nil {
|
if filter.After != nil {
|
||||||
|
@ -457,7 +378,7 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
||||||
|
|
||||||
t := &influxdb.Task{}
|
t := &influxdb.Task{}
|
||||||
if err := json.Unmarshal(v, t); err != nil {
|
if err := json.Unmarshal(v, t); err != nil {
|
||||||
return nil, 0, ErrInternalTaskServiceError(err)
|
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -484,7 +405,7 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
||||||
}
|
}
|
||||||
t := &influxdb.Task{}
|
t := &influxdb.Task{}
|
||||||
if err := json.Unmarshal(v, t); err != nil {
|
if err := json.Unmarshal(v, t); err != nil {
|
||||||
return nil, 0, ErrInternalTaskServiceError(err)
|
return nil, 0, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
latestCompleted, err := s.findLatestCompletedTime(ctx, tx, t.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -557,12 +478,12 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if org == nil {
|
if org == nil {
|
||||||
return nil, ErrOrgNotFound
|
return nil, &influxdb.ErrOrgNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
opt, err := options.FromScript(tc.Flux)
|
opt, err := options.FromScript(tc.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrTaskOptionParse(err)
|
return nil, influxdb.ErrTaskOptionParse(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if tc.Status == "" {
|
if tc.Status == "" {
|
||||||
|
@ -590,17 +511,17 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
||||||
|
|
||||||
taskBucket, err := tx.Bucket(taskBucket)
|
taskBucket, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
indexBucket, err := tx.Bucket(taskIndexBucket)
|
indexBucket, err := tx.Bucket(taskIndexBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
taskBytes, err := json.Marshal(task)
|
taskBytes, err := json.Marshal(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
taskKey, err := taskKey(task.ID)
|
taskKey, err := taskKey(task.ID)
|
||||||
|
@ -616,13 +537,13 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
||||||
// write the task
|
// write the task
|
||||||
err = taskBucket.Put(taskKey, taskBytes)
|
err = taskBucket.Put(taskKey, taskBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// write the org index
|
// write the org index
|
||||||
err = indexBucket.Put(orgKey, taskKey)
|
err = indexBucket.Put(orgKey, taskKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
if err := s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{
|
if err := s.createUserResourceMapping(ctx, tx, &influxdb.UserResourceMapping{
|
||||||
ResourceType: influxdb.TasksResourceType,
|
ResourceType: influxdb.TasksResourceType,
|
||||||
|
@ -670,7 +591,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
||||||
|
|
||||||
options, err := options.FromScript(*upd.Flux)
|
options, err := options.FromScript(*upd.Flux)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrTaskOptionParse(err)
|
return nil, influxdb.ErrTaskOptionParse(err)
|
||||||
}
|
}
|
||||||
task.Name = options.Name
|
task.Name = options.Name
|
||||||
task.Every = options.Every.String()
|
task.Every = options.Every.String()
|
||||||
|
@ -705,7 +626,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
||||||
// save the updated task
|
// save the updated task
|
||||||
bucket, err := tx.Bucket(taskBucket)
|
bucket, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
key, err := taskKey(id)
|
key, err := taskKey(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -714,7 +635,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
||||||
|
|
||||||
taskBytes, err := json.Marshal(task)
|
taskBytes, err := json.Marshal(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return task, bucket.Put(key, taskBytes)
|
return task, bucket.Put(key, taskBytes)
|
||||||
|
@ -739,17 +660,17 @@ func (s *Service) DeleteTask(ctx context.Context, id influxdb.ID) error {
|
||||||
func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||||
taskBucket, err := tx.Bucket(taskBucket)
|
taskBucket, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runBucket, err := tx.Bucket(taskRunBucket)
|
runBucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
indexBucket, err := tx.Bucket(taskIndexBucket)
|
indexBucket, err := tx.Bucket(taskIndexBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieve the task
|
// retrieve the task
|
||||||
|
@ -765,7 +686,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexBucket.Delete(orgKey); err != nil {
|
if err := indexBucket.Delete(orgKey); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove latest completed
|
// remove latest completed
|
||||||
|
@ -775,7 +696,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := runBucket.Delete(lastCompletedKey); err != nil {
|
if err := runBucket.Delete(lastCompletedKey); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove the runs
|
// remove the runs
|
||||||
|
@ -791,7 +712,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := runBucket.Delete(key); err != nil {
|
if err := runBucket.Delete(key); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// remove the task
|
// remove the task
|
||||||
|
@ -801,7 +722,7 @@ func (s *Service) deleteTask(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := taskBucket.Delete(key); err != nil {
|
if err := taskBucket.Delete(key); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{
|
return s.deleteUserResourceMapping(ctx, tx, influxdb.UserResourceMappingFilter{
|
||||||
|
@ -878,7 +799,7 @@ func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
return nil, 0, backend.ErrOutOfBoundsLimit
|
return nil, 0, &influxdb.ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
var runs []*influxdb.Run
|
var runs []*influxdb.Run
|
||||||
|
@ -930,7 +851,7 @@ func (s *Service) FindRunByID(ctx context.Context, taskID, runID influxdb.ID) (*
|
||||||
func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key, err := taskRunKey(taskID, runID)
|
key, err := taskRunKey(taskID, runID)
|
||||||
|
@ -940,14 +861,14 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID influxdb
|
||||||
runBytes, err := bucket.Get(key)
|
runBytes, err := bucket.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if IsNotFound(err) {
|
if IsNotFound(err) {
|
||||||
return nil, ErrRunNotFound
|
return nil, &influxdb.ErrRunNotFound
|
||||||
}
|
}
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
run := &influxdb.Run{}
|
run := &influxdb.Run{}
|
||||||
err = json.Unmarshal(runBytes, run)
|
err = json.Unmarshal(runBytes, run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return run, nil
|
return run, nil
|
||||||
|
@ -978,12 +899,12 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
// save
|
// save
|
||||||
bucket, err := tx.Bucket(taskBucket)
|
bucket, err := tx.Bucket(taskBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runBytes, err := json.Marshal(run)
|
runBytes, err := json.Marshal(run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrInternalTaskServiceError(err)
|
return influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runKey, err := taskRunKey(taskID, runID)
|
runKey, err := taskRunKey(taskID, runID)
|
||||||
|
@ -992,7 +913,7 @@ func (s *Service) cancelRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bucket.Put(runKey, runBytes); err != nil {
|
if err := bucket.Put(runKey, runBytes); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1028,7 +949,7 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID
|
||||||
// add a clean copy of the run to the manual runs
|
// add a clean copy of the run to the manual runs
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key, err := taskManualRunKey(taskID)
|
key, err := taskManualRunKey(taskID)
|
||||||
|
@ -1040,15 +961,15 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID
|
||||||
runsBytes, err := bucket.Get(key)
|
runsBytes, err := bucket.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != ErrKeyNotFound {
|
if err != ErrKeyNotFound {
|
||||||
return nil, ErrRunNotFound
|
return nil, &influxdb.ErrRunNotFound
|
||||||
}
|
}
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if runsBytes != nil {
|
if runsBytes != nil {
|
||||||
if err := json.Unmarshal(runsBytes, &runs); err != nil {
|
if err := json.Unmarshal(runsBytes, &runs); err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1057,11 +978,11 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID
|
||||||
// save manual runs
|
// save manual runs
|
||||||
runsBytes, err = json.Marshal(runs)
|
runsBytes, err = json.Marshal(runs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bucket.Put(key, runsBytes); err != nil {
|
if err := bucket.Put(key, runsBytes); err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
|
@ -1097,7 +1018,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
|
||||||
// add a clean copy of the run to the manual runs
|
// add a clean copy of the run to the manual runs
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runs, err := s.manualRuns(ctx, tx, taskID)
|
runs, err := s.manualRuns(ctx, tx, taskID)
|
||||||
|
@ -1108,7 +1029,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
|
||||||
// check to see if this run is already queued
|
// check to see if this run is already queued
|
||||||
for _, run := range runs {
|
for _, run := range runs {
|
||||||
if run.ScheduledFor == r.ScheduledFor {
|
if run.ScheduledFor == r.ScheduledFor {
|
||||||
return nil, ErrTaskRunAlreadyQueued
|
return nil, &influxdb.ErrTaskRunAlreadyQueued
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
runs = append(runs, r)
|
runs = append(runs, r)
|
||||||
|
@ -1116,7 +1037,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
|
||||||
// save manual runs
|
// save manual runs
|
||||||
runsBytes, err := json.Marshal(runs)
|
runsBytes, err := json.Marshal(runs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key, err := taskManualRunKey(taskID)
|
key, err := taskManualRunKey(taskID)
|
||||||
|
@ -1125,7 +1046,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bucket.Put(key, runsBytes); err != nil {
|
if err := bucket.Put(key, runsBytes); err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
|
@ -1165,11 +1086,11 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
// save manual runs
|
// save manual runs
|
||||||
b, err := tx.Bucket(taskRunBucket)
|
b, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err)
|
return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
mRunsBytes, err := json.Marshal(mRuns)
|
mRunsBytes, err := json.Marshal(mRuns)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrInternalTaskServiceError(err)
|
return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runsKey, err := taskManualRunKey(taskID)
|
runsKey, err := taskManualRunKey(taskID)
|
||||||
|
@ -1178,12 +1099,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Put(runsKey, mRunsBytes); err != nil {
|
if err := b.Put(runsKey, mRunsBytes); err != nil {
|
||||||
return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err)
|
return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
// add mRun to the list of currently running
|
// add mRun to the list of currently running
|
||||||
mRunBytes, err := json.Marshal(mRun)
|
mRunBytes, err := json.Marshal(mRun)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrInternalTaskServiceError(err)
|
return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runKey, err := taskRunKey(taskID, mRun.ID)
|
runKey, err := taskRunKey(taskID, mRun.ID)
|
||||||
|
@ -1192,7 +1113,7 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Put(runKey, mRunBytes); err != nil {
|
if err := b.Put(runKey, mRunBytes); err != nil {
|
||||||
return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err)
|
return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// return mRun
|
// return mRun
|
||||||
|
@ -1231,7 +1152,7 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
// the earliest it could have been completed is "created at"
|
// the earliest it could have been completed is "created at"
|
||||||
latestCompleted, err := time.Parse(time.RFC3339, task.CreatedAt)
|
latestCompleted, err := time.Parse(time.RFC3339, task.CreatedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrTaskTimeParse(err)
|
return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we could have a latest completed newer then the created at time.
|
// we could have a latest completed newer then the created at time.
|
||||||
|
@ -1281,21 +1202,21 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
// create a run if possible
|
// create a run if possible
|
||||||
sch, err := cron.Parse(task.EffectiveCron())
|
sch, err := cron.Parse(task.EffectiveCron())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrTaskTimeParse(err)
|
return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
nowTime := time.Unix(now, 0)
|
nowTime := time.Unix(now, 0)
|
||||||
nextScheduled := sch.Next(latestCompleted).UTC()
|
nextScheduled := sch.Next(latestCompleted).UTC()
|
||||||
nextScheduledUnix := nextScheduled.Unix()
|
nextScheduledUnix := nextScheduled.Unix()
|
||||||
offset := &options.Duration{}
|
offset := &options.Duration{}
|
||||||
if err := offset.Parse(task.Offset); err != nil {
|
if err := offset.Parse(task.Offset); err != nil {
|
||||||
return backend.RunCreation{}, ErrTaskTimeParse(err)
|
return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
dueAt, err := offset.Add(nextScheduled)
|
dueAt, err := offset.Add(nextScheduled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrTaskTimeParse(err)
|
return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
if dueAt.After(nowTime) {
|
if dueAt.After(nowTime) {
|
||||||
return backend.RunCreation{}, backend.RunNotYetDueError{DueAt: dueAt.Unix()}
|
return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt.Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
id := s.IDGenerator.ID()
|
id := s.IDGenerator.ID()
|
||||||
|
@ -1309,12 +1230,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
}
|
}
|
||||||
b, err := tx.Bucket(taskRunBucket)
|
b, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err)
|
return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runBytes, err := json.Marshal(run)
|
runBytes, err := json.Marshal(run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrInternalTaskServiceError(err)
|
return backend.RunCreation{}, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runKey, err := taskRunKey(taskID, run.ID)
|
runKey, err := taskRunKey(taskID, run.ID)
|
||||||
|
@ -1322,12 +1243,12 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
|
||||||
return backend.RunCreation{}, err
|
return backend.RunCreation{}, err
|
||||||
}
|
}
|
||||||
if err := b.Put(runKey, runBytes); err != nil {
|
if err := b.Put(runKey, runBytes); err != nil {
|
||||||
return backend.RunCreation{}, ErrUnexpectedTaskBucketErr(err)
|
return backend.RunCreation{}, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
nextDue, err := offset.Add(sch.Next(nextScheduled))
|
nextDue, err := offset.Add(sch.Next(nextScheduled))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return backend.RunCreation{}, ErrTaskTimeParse(err)
|
return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
// populate RunCreation
|
// populate RunCreation
|
||||||
return backend.RunCreation{
|
return backend.RunCreation{
|
||||||
|
@ -1362,12 +1283,12 @@ func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*
|
||||||
func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := bucket.Cursor()
|
c, err := bucket.Cursor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
var runs []*influxdb.Run
|
var runs []*influxdb.Run
|
||||||
|
|
||||||
|
@ -1387,7 +1308,7 @@ func (s *Service) currentlyRunning(ctx context.Context, tx Tx, taskID influxdb.I
|
||||||
}
|
}
|
||||||
r := &influxdb.Run{}
|
r := &influxdb.Run{}
|
||||||
if err := json.Unmarshal(v, r); err != nil {
|
if err := json.Unmarshal(v, r); err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the run no longer belongs to the task we are done
|
// if the run no longer belongs to the task we are done
|
||||||
|
@ -1420,7 +1341,7 @@ func (s *Service) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influx
|
||||||
func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]*influxdb.Run, error) {
|
||||||
b, err := tx.Bucket(taskRunBucket)
|
b, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
key, err := taskManualRunKey(taskID)
|
key, err := taskManualRunKey(taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1433,10 +1354,10 @@ func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]
|
||||||
if err == ErrKeyNotFound {
|
if err == ErrKeyNotFound {
|
||||||
return runs, nil
|
return runs, nil
|
||||||
}
|
}
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(val, &runs); err != nil {
|
if err := json.Unmarshal(val, &runs); err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return runs, nil
|
return runs, nil
|
||||||
|
@ -1481,13 +1402,13 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
}
|
}
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rTime.After(lTime) {
|
if rTime.After(lTime) {
|
||||||
rb, err := json.Marshal(r)
|
rb, err := json.Marshal(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
lKey, err := taskLatestCompletedKey(taskID)
|
lKey, err := taskLatestCompletedKey(taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1495,7 +1416,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bucket.Put(lKey, rb); err != nil {
|
if err := bucket.Put(lKey, rb); err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1505,7 +1426,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := bucket.Delete(key); err != nil {
|
if err := bucket.Delete(key); err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
|
@ -1544,7 +1465,7 @@ func (s *Service) nextDueRun(ctx context.Context, tx Tx, taskID influxdb.ID) (in
|
||||||
// create a run if possible
|
// create a run if possible
|
||||||
sch, err := cron.Parse(task.EffectiveCron())
|
sch, err := cron.Parse(task.EffectiveCron())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, ErrTaskTimeParse(err)
|
return 0, influxdb.ErrTaskTimeParse(err)
|
||||||
}
|
}
|
||||||
nextScheduled := sch.Next(latestCompleted).UTC()
|
nextScheduled := sch.Next(latestCompleted).UTC()
|
||||||
|
|
||||||
|
@ -1582,12 +1503,12 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ
|
||||||
// save run
|
// save run
|
||||||
b, err := tx.Bucket(taskRunBucket)
|
b, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runBytes, err := json.Marshal(run)
|
runBytes, err := json.Marshal(run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrInternalTaskServiceError(err)
|
return influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runKey, err := taskRunKey(taskID, run.ID)
|
runKey, err := taskRunKey(taskID, run.ID)
|
||||||
|
@ -1595,7 +1516,7 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := b.Put(runKey, runBytes); err != nil {
|
if err := b.Put(runKey, runBytes); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1625,12 +1546,12 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
// save run
|
// save run
|
||||||
b, err := tx.Bucket(taskRunBucket)
|
b, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runBytes, err := json.Marshal(run)
|
runBytes, err := json.Marshal(run)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrInternalTaskServiceError(err)
|
return influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runKey, err := taskRunKey(taskID, run.ID)
|
runKey, err := taskRunKey(taskID, run.ID)
|
||||||
|
@ -1639,7 +1560,7 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.Put(runKey, runBytes); err != nil {
|
if err := b.Put(runKey, runBytes); err != nil {
|
||||||
return ErrUnexpectedTaskBucketErr(err)
|
return influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1648,7 +1569,7 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I
|
||||||
func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Run, error) {
|
func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Run, error) {
|
||||||
bucket, err := tx.Bucket(taskRunBucket)
|
bucket, err := tx.Bucket(taskRunBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
key, err := taskLatestCompletedKey(id)
|
key, err := taskLatestCompletedKey(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1660,12 +1581,12 @@ func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID
|
||||||
if err == ErrKeyNotFound {
|
if err == ErrKeyNotFound {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return nil, ErrUnexpectedTaskBucketErr(err)
|
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
run := &influxdb.Run{}
|
run := &influxdb.Run{}
|
||||||
if err = json.Unmarshal(bytes, run); err != nil {
|
if err = json.Unmarshal(bytes, run); err != nil {
|
||||||
return nil, ErrInternalTaskServiceError(err)
|
return nil, influxdb.ErrInternalTaskServiceError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return run, nil
|
return run, nil
|
||||||
|
@ -1686,7 +1607,7 @@ func (s *Service) findLatestCompletedTime(ctx context.Context, tx Tx, id influxd
|
||||||
func taskKey(taskID influxdb.ID) ([]byte, error) {
|
func taskKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
encodedID, err := taskID.Encode()
|
encodedID, err := taskID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
return encodedID, nil
|
return encodedID, nil
|
||||||
}
|
}
|
||||||
|
@ -1694,7 +1615,7 @@ func taskKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
|
func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
encodedID, err := taskID.Encode()
|
encodedID, err := taskID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
return []byte(string(encodedID) + "/latestCompleted"), nil
|
return []byte(string(encodedID) + "/latestCompleted"), nil
|
||||||
}
|
}
|
||||||
|
@ -1702,7 +1623,7 @@ func taskLatestCompletedKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
|
func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
encodedID, err := taskID.Encode()
|
encodedID, err := taskID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
return []byte(string(encodedID) + "/manualRuns"), nil
|
return []byte(string(encodedID) + "/manualRuns"), nil
|
||||||
}
|
}
|
||||||
|
@ -1710,11 +1631,11 @@ func taskManualRunKey(taskID influxdb.ID) ([]byte, error) {
|
||||||
func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
|
func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
|
||||||
encodedOrgID, err := orgID.Encode()
|
encodedOrgID, err := orgID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
encodedID, err := taskID.Encode()
|
encodedID, err := taskID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
|
|
||||||
return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil
|
return []byte(string(encodedOrgID) + "/" + string(encodedID)), nil
|
||||||
|
@ -1723,11 +1644,11 @@ func taskOrgKey(orgID, taskID influxdb.ID) ([]byte, error) {
|
||||||
func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
|
func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
|
||||||
encodedID, err := taskID.Encode()
|
encodedID, err := taskID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
encodedRunID, err := runID.Encode()
|
encodedRunID, err := runID.Encode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrInvalidTaskID
|
return nil, &influxdb.ErrInvalidTaskID
|
||||||
}
|
}
|
||||||
|
|
||||||
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
|
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -35,20 +34,6 @@ const (
|
||||||
taskSystemBucketID platform.ID = 10
|
taskSystemBucketID platform.ID = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrTaskNotFound indicates no task could be found for given parameters.
|
|
||||||
ErrTaskNotFound = errors.New("task not found")
|
|
||||||
|
|
||||||
// ErrRunNotFound is returned when searching for a single run that doesn't exist.
|
|
||||||
ErrRunNotFound = errors.New("run not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit.
|
|
||||||
var ErrOutOfBoundsLimit = &platform.Error{
|
|
||||||
Code: platform.EUnprocessableEntity,
|
|
||||||
Msg: "run limit is out of bounds, must be between 1 and 500",
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
|
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
|
||||||
func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
|
func NewAnalyticalStorage(logger *zap.Logger, ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
|
||||||
return &AnalyticalStorage{
|
return &AnalyticalStorage{
|
||||||
|
@ -157,7 +142,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
|
||||||
}
|
}
|
||||||
|
|
||||||
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
return nil, 0, ErrOutOfBoundsLimit
|
return nil, 0, &influxdb.ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
runs, n, err := as.TaskService.FindRuns(ctx, filter)
|
runs, n, err := as.TaskService.FindRuns(ctx, filter)
|
||||||
|
@ -270,7 +255,7 @@ func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID infl
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(re.runs) == 0 {
|
if len(re.runs) == 0 {
|
||||||
return nil, ErrRunNotFound
|
return nil, &platform.ErrRunNotFound
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,12 +121,12 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
|
||||||
|
|
||||||
// If disabling the task, do so before modifying the script.
|
// If disabling the task, do so before modifying the script.
|
||||||
if task.Status != oldTask.Status && task.Status == string(backend.TaskInactive) {
|
if task.Status != oldTask.Status && task.Status == string(backend.TaskInactive) {
|
||||||
if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed {
|
if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed {
|
||||||
return task, err
|
return task, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.sch.UpdateTask(ctx, task); err != nil && err != backend.ErrTaskNotClaimed {
|
if err := c.sch.UpdateTask(ctx, task); err != nil && err != &platform.ErrTaskNotClaimed {
|
||||||
return task, err
|
return task, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
|
||||||
return task, err
|
return task, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.sch.ClaimTask(ctx, task); err != nil && err != backend.ErrTaskAlreadyClaimed {
|
if err := c.sch.ClaimTask(ctx, task); err != nil && err != &platform.ErrTaskAlreadyClaimed {
|
||||||
return task, err
|
return task, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,7 @@ func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error {
|
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed {
|
if err := c.sch.ReleaseTask(id); err != nil && err != &platform.ErrTaskNotClaimed {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ func inmemTaskService() platform.TaskService {
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
t, ok := tasks[id]
|
t, ok := tasks[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if upd.Flux != nil {
|
if upd.Flux != nil {
|
||||||
t.Flux = *upd.Flux
|
t.Flux = *upd.Flux
|
||||||
|
@ -78,7 +78,7 @@ func inmemTaskService() platform.TaskService {
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
t, ok := tasks[id]
|
t, ok := tasks[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
newt := *t
|
newt := *t
|
||||||
return &newt, nil
|
return &newt, nil
|
||||||
|
@ -100,7 +100,7 @@ func inmemTaskService() platform.TaskService {
|
||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
t, ok := tasks[id]
|
t, ok := tasks[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, backend.ErrTaskNotFound
|
return nil, &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
|
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
|
||||||
|
@ -276,7 +276,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != backend.ErrTaskNotFound {
|
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != &platform.ErrTaskNotFound {
|
||||||
t.Fatalf("expected deleted task not to be found; got %v", err)
|
t.Fatalf("expected deleted task not to be found; got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ func (p *syncRunPromise) Wait() (backend.RunResult, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syncRunPromise) Cancel() {
|
func (p *syncRunPromise) Cancel() {
|
||||||
p.finish(nil, backend.ErrRunCanceled)
|
p.finish(nil, &influxdb.ErrRunCanceled)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *syncRunPromise) finish(res *runResult, err error) {
|
func (p *syncRunPromise) finish(res *runResult, err error) {
|
||||||
|
@ -312,7 +312,7 @@ func (p *asyncRunPromise) Wait() (backend.RunResult, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *asyncRunPromise) Cancel() {
|
func (p *asyncRunPromise) Cancel() {
|
||||||
p.finish(nil, backend.ErrRunCanceled)
|
p.finish(nil, &influxdb.ErrRunCanceled)
|
||||||
}
|
}
|
||||||
|
|
||||||
// followQuery waits for the query to become ready and sets p's results.
|
// followQuery waits for the query to become ready and sets p's results.
|
||||||
|
|
|
@ -417,7 +417,7 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) {
|
||||||
rp.Cancel()
|
rp.Cancel()
|
||||||
|
|
||||||
res, err := rp.Wait()
|
res, err := rp.Wait()
|
||||||
if err != backend.ErrRunCanceled {
|
if err != &platform.ErrRunCanceled {
|
||||||
t.Fatalf("expected ErrRunCanceled, got %v", err)
|
t.Fatalf("expected ErrRunCanceled, got %v", err)
|
||||||
}
|
}
|
||||||
if res != nil {
|
if res != nil {
|
||||||
|
|
|
@ -197,13 +197,13 @@ func (s *TickScheduler) CancelRun(_ context.Context, taskID, runID platform.ID)
|
||||||
defer s.schedulerMu.Unlock()
|
defer s.schedulerMu.Unlock()
|
||||||
ts, ok := s.taskSchedulers[taskID]
|
ts, ok := s.taskSchedulers[taskID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrTaskNotFound
|
return &platform.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
ts.runningMu.Lock()
|
ts.runningMu.Lock()
|
||||||
c, ok := ts.running[runID]
|
c, ok := ts.running[runID]
|
||||||
if !ok {
|
if !ok {
|
||||||
ts.runningMu.Unlock()
|
ts.runningMu.Unlock()
|
||||||
return ErrRunNotFound
|
return &platform.ErrRunNotFound
|
||||||
}
|
}
|
||||||
ts.runningMu.Unlock()
|
ts.runningMu.Unlock()
|
||||||
if c.CancelFunc != nil {
|
if c.CancelFunc != nil {
|
||||||
|
@ -302,7 +302,7 @@ func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task)
|
||||||
|
|
||||||
_, ok := s.taskSchedulers[task.ID]
|
_, ok := s.taskSchedulers[task.ID]
|
||||||
if ok {
|
if ok {
|
||||||
return ErrTaskAlreadyClaimed
|
return &platform.ErrTaskAlreadyClaimed
|
||||||
}
|
}
|
||||||
|
|
||||||
s.taskSchedulers[task.ID] = ts
|
s.taskSchedulers[task.ID] = ts
|
||||||
|
@ -336,7 +336,7 @@ func (s *TickScheduler) UpdateTask(authCtx context.Context, task *platform.Task)
|
||||||
|
|
||||||
ts, ok := s.taskSchedulers[task.ID]
|
ts, ok := s.taskSchedulers[task.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrTaskNotClaimed
|
return &platform.ErrTaskNotClaimed
|
||||||
}
|
}
|
||||||
ts.task = task
|
ts.task = task
|
||||||
|
|
||||||
|
@ -390,7 +390,7 @@ func (s *TickScheduler) ReleaseTask(taskID platform.ID) error {
|
||||||
|
|
||||||
t, ok := s.taskSchedulers[taskID]
|
t, ok := s.taskSchedulers[taskID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrTaskNotClaimed
|
return &platform.ErrTaskNotClaimed
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Cancel()
|
t.Cancel()
|
||||||
|
@ -746,7 +746,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
|
||||||
rr, err := rp.Wait()
|
rr, err := rp.Wait()
|
||||||
close(ready)
|
close(ready)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrRunCanceled {
|
if err == &platform.ErrRunCanceled {
|
||||||
r.updateRunState(qr, RunCanceled, runLogger)
|
r.updateRunState(qr, RunCanceled, runLogger)
|
||||||
errMsg = "Waiting for execution result failed, " + errMsg
|
errMsg = "Waiting for execution result failed, " + errMsg
|
||||||
// Move on to the next execution, for a canceled run.
|
// Move on to the next execution, for a canceled run.
|
||||||
|
|
|
@ -64,7 +64,7 @@ func TestScheduler_Cancelation(t *testing.T) {
|
||||||
|
|
||||||
// check for when we cancel something already canceled
|
// check for when we cancel something already canceled
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != backend.ErrRunNotFound {
|
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != &platform.ErrRunNotFound {
|
||||||
t.Fatalf("expected ErrRunNotFound but got %s", err)
|
t.Fatalf("expected ErrRunNotFound but got %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package backend
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -15,7 +14,7 @@ type TaskControlService interface {
|
||||||
// CreateNextRun attempts to create a new run.
|
// CreateNextRun attempts to create a new run.
|
||||||
// The new run's ScheduledFor is assigned the earliest possible time according to task's cron,
|
// The new run's ScheduledFor is assigned the earliest possible time according to task's cron,
|
||||||
// that is later than any in-progress run and LatestCompleted run.
|
// that is later than any in-progress run and LatestCompleted run.
|
||||||
// If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.
|
// If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns an ErrRunNotDueYet.
|
||||||
CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error)
|
CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error)
|
||||||
|
|
||||||
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
|
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
|
||||||
|
@ -70,30 +69,6 @@ func (r RunStatus) String() string {
|
||||||
panic(fmt.Sprintf("unknown RunStatus: %d", r))
|
panic(fmt.Sprintf("unknown RunStatus: %d", r))
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
// ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally.
|
|
||||||
ErrRunCanceled = errors.New("run canceled")
|
|
||||||
|
|
||||||
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
|
|
||||||
ErrTaskNotClaimed = errors.New("task not claimed")
|
|
||||||
|
|
||||||
// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
|
|
||||||
ErrNoRunsFound = errors.New("no matching runs found")
|
|
||||||
|
|
||||||
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
|
|
||||||
ErrTaskAlreadyClaimed = errors.New("task already claimed")
|
|
||||||
)
|
|
||||||
|
|
||||||
// RunNotYetDueError is returned from CreateNextRun if a run is not yet due.
|
|
||||||
type RunNotYetDueError struct {
|
|
||||||
// DueAt is the unix timestamp of when the next run is due.
|
|
||||||
DueAt int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e RunNotYetDueError) Error() string {
|
|
||||||
return "run not due until " + time.Unix(e.DueAt, 0).UTC().Format(time.RFC3339)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
|
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
|
||||||
type RequestStillQueuedError struct {
|
type RequestStillQueuedError struct {
|
||||||
// Unix timestamps matching existing request's start and end.
|
// Unix timestamps matching existing request's start and end.
|
||||||
|
|
|
@ -62,7 +62,7 @@ func (s *Scheduler) ClaimTask(_ context.Context, task *platform.Task) error {
|
||||||
|
|
||||||
_, ok := s.claims[task.ID]
|
_, ok := s.claims[task.ID]
|
||||||
if ok {
|
if ok {
|
||||||
return backend.ErrTaskAlreadyClaimed
|
return &platform.ErrTaskAlreadyClaimed
|
||||||
}
|
}
|
||||||
|
|
||||||
s.claims[task.ID] = task
|
s.claims[task.ID] = task
|
||||||
|
@ -80,7 +80,7 @@ func (s *Scheduler) UpdateTask(_ context.Context, task *platform.Task) error {
|
||||||
|
|
||||||
_, ok := s.claims[task.ID]
|
_, ok := s.claims[task.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return backend.ErrTaskNotClaimed
|
return &platform.ErrTaskNotClaimed
|
||||||
}
|
}
|
||||||
|
|
||||||
s.claims[task.ID] = task
|
s.claims[task.ID] = task
|
||||||
|
@ -102,7 +102,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
|
||||||
|
|
||||||
t, ok := s.claims[taskID]
|
t, ok := s.claims[taskID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return backend.ErrTaskNotClaimed
|
return &platform.ErrTaskNotClaimed
|
||||||
}
|
}
|
||||||
if s.releaseChan != nil {
|
if s.releaseChan != nil {
|
||||||
s.releaseChan <- t
|
s.releaseChan <- t
|
||||||
|
@ -302,7 +302,7 @@ func (p *RunPromise) Wait() (backend.RunResult, error) {
|
||||||
|
|
||||||
func (p *RunPromise) Cancel() {
|
func (p *RunPromise) Cancel() {
|
||||||
p.cancelFunc()
|
p.cancelFunc()
|
||||||
p.Finish(nil, backend.ErrRunCanceled)
|
p.Finish(nil, &platform.ErrRunCanceled)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish unblocks any call to Wait, to return r and err.
|
// Finish unblocks any call to Wait, to return r and err.
|
||||||
|
|
|
@ -139,7 +139,7 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if dueAt := nextScheduledUnix + int64(offset); dueAt > now {
|
if dueAt := nextScheduledUnix + int64(offset); dueAt > now {
|
||||||
return backend.RunCreation{}, backend.RunNotYetDueError{DueAt: dueAt}
|
return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
runID := idgen.ID()
|
runID := idgen.ID()
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
package options
|
package options
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -234,7 +233,7 @@ func FromScript(script string) (Options, error) {
|
||||||
// pull options from the program scope
|
// pull options from the program scope
|
||||||
task, ok := scope.Lookup("task")
|
task, ok := scope.Lookup("task")
|
||||||
if !ok {
|
if !ok {
|
||||||
return opt, errors.New("missing required option: 'task'")
|
return opt, ErrMissingRequiredTaskOption("task")
|
||||||
}
|
}
|
||||||
// check to make sure task is an object
|
// check to make sure task is an object
|
||||||
if err := checkNature(task.PolyType().Nature(), semantic.Object); err != nil {
|
if err := checkNature(task.PolyType().Nature(), semantic.Object); err != nil {
|
||||||
|
@ -247,7 +246,7 @@ func FromScript(script string) (Options, error) {
|
||||||
|
|
||||||
nameVal, ok := optObject.Get(optName)
|
nameVal, ok := optObject.Get(optName)
|
||||||
if !ok {
|
if !ok {
|
||||||
return opt, errors.New("missing name in task options")
|
return opt, ErrMissingRequiredTaskOption("name")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := checkNature(nameVal.PolyType().Nature(), semantic.String); err != nil {
|
if err := checkNature(nameVal.PolyType().Nature(), semantic.String); err != nil {
|
||||||
|
@ -257,11 +256,11 @@ func FromScript(script string) (Options, error) {
|
||||||
crVal, cronOK := optObject.Get(optCron)
|
crVal, cronOK := optObject.Get(optCron)
|
||||||
everyVal, everyOK := optObject.Get(optEvery)
|
everyVal, everyOK := optObject.Get(optEvery)
|
||||||
if cronOK && everyOK {
|
if cronOK && everyOK {
|
||||||
return opt, errors.New("cannot use both cron and every in task options")
|
return opt, ErrDuplicateIntervalField
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cronOK && !everyOK {
|
if !cronOK && !everyOK {
|
||||||
return opt, errors.New("cron or every is required")
|
return opt, ErrMissingRequiredTaskOption("cron or every is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
if cronOK {
|
if cronOK {
|
||||||
|
@ -277,14 +276,14 @@ func FromScript(script string) (Options, error) {
|
||||||
}
|
}
|
||||||
dur, ok := durTypes["every"]
|
dur, ok := durTypes["every"]
|
||||||
if !ok || dur == nil {
|
if !ok || dur == nil {
|
||||||
return opt, errors.New("failed to parse `every` in task")
|
return opt, ErrParseTaskOptionField("every")
|
||||||
}
|
}
|
||||||
durNode, err := parseSignedDuration(dur.Location().Source)
|
durNode, err := parseSignedDuration(dur.Location().Source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return opt, err
|
return opt, err
|
||||||
}
|
}
|
||||||
if !ok || durNode == nil {
|
if !ok || durNode == nil {
|
||||||
return opt, errors.New("failed to parse `every` in task")
|
return opt, ErrParseTaskOptionField("every")
|
||||||
}
|
}
|
||||||
durNode.BaseNode = ast.BaseNode{}
|
durNode.BaseNode = ast.BaseNode{}
|
||||||
opt.Every.Node = *durNode
|
opt.Every.Node = *durNode
|
||||||
|
@ -296,14 +295,14 @@ func FromScript(script string) (Options, error) {
|
||||||
}
|
}
|
||||||
dur, ok := durTypes["offset"]
|
dur, ok := durTypes["offset"]
|
||||||
if !ok || dur == nil {
|
if !ok || dur == nil {
|
||||||
return opt, errors.New("failed to parse `offset` in task")
|
return opt, ErrParseTaskOptionField("offset")
|
||||||
}
|
}
|
||||||
durNode, err := parseSignedDuration(dur.Location().Source)
|
durNode, err := parseSignedDuration(dur.Location().Source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return opt, err
|
return opt, err
|
||||||
}
|
}
|
||||||
if !ok || durNode == nil {
|
if !ok || durNode == nil {
|
||||||
return opt, errors.New("failed to parse `offset` in task")
|
return opt, ErrParseTaskOptionField("offset")
|
||||||
}
|
}
|
||||||
durNode.BaseNode = ast.BaseNode{}
|
durNode.BaseNode = ast.BaseNode{}
|
||||||
opt.Offset = &Duration{}
|
opt.Offset = &Duration{}
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
package options
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ErrParseTaskOptionField(opt string) error {
|
||||||
|
return fmt.Errorf("failed to parse field '%s' in task options", opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ErrMissingRequiredTaskOption(opt string) error {
|
||||||
|
return fmt.Errorf("missing required option: %s", opt)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrDuplicateIntervalField = fmt.Errorf("cannot use both cron and every in task options")
|
||||||
|
)
|
|
@ -401,8 +401,8 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task should not be returned.
|
// Task should not be returned.
|
||||||
if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != backend.ErrTaskNotFound {
|
if _, err := sys.TaskService.FindTaskByID(sys.Ctx, origID); err != &influxdb.ErrTaskNotFound {
|
||||||
t.Fatalf("expected %v, got %v", backend.ErrTaskNotFound, err)
|
t.Fatalf("expected %v, got %v", influxdb.ErrTaskNotFound, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -604,12 +604,12 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
|
|
||||||
// check run filter errors
|
// check run filter errors
|
||||||
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
||||||
if err0 != backend.ErrOutOfBoundsLimit {
|
if err0 != &influxdb.ErrOutOfBoundsLimit {
|
||||||
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
||||||
if err1 != backend.ErrOutOfBoundsLimit {
|
if err1 != &influxdb.ErrOutOfBoundsLimit {
|
||||||
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -685,13 +685,13 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
// Look for a run that doesn't exist.
|
// Look for a run that doesn't exist.
|
||||||
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
|
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// look for a taskID that doesn't exist.
|
// look for a taskID that doesn't exist.
|
||||||
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
|
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
|
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
|
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
|
||||||
|
@ -956,7 +956,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
|
||||||
if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time
|
if _, err := sys.TaskControlService.CreateNextRun(sys.Ctx, tid, math.MaxInt64>>6); err != nil { // we use the >>6 here because math.MaxInt64 is too large which causes problems when converting back and forth from time
|
||||||
// This may have errored due to the task being deleted. Check if the task still exists.
|
// This may have errored due to the task being deleted. Check if the task still exists.
|
||||||
|
|
||||||
if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == backend.ErrTaskNotFound {
|
if _, err2 := sys.TaskService.FindTaskByID(sys.Ctx, tid); err2 == &influxdb.ErrTaskNotFound {
|
||||||
// It was deleted. Just continue.
|
// It was deleted. Just continue.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1046,12 +1046,12 @@ func testRunStorage(t *testing.T, sys *System) {
|
||||||
|
|
||||||
// check run filter errors
|
// check run filter errors
|
||||||
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
||||||
if err0 != backend.ErrOutOfBoundsLimit {
|
if err0 != &influxdb.ErrOutOfBoundsLimit {
|
||||||
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
||||||
if err1 != backend.ErrOutOfBoundsLimit {
|
if err1 != &influxdb.ErrOutOfBoundsLimit {
|
||||||
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1153,13 +1153,13 @@ func testRunStorage(t *testing.T, sys *System) {
|
||||||
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
_, err = sys.TaskService.FindRunByID(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
||||||
// TODO(lh): use kv.ErrRunNotFound in the future. Our error's are not exact
|
// TODO(lh): use kv.ErrRunNotFound in the future. Our error's are not exact
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
|
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// look for a taskID that doesn't exist.
|
// look for a taskID that doesn't exist.
|
||||||
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
|
_, err = sys.TaskService.FindRunByID(sys.Ctx, influxdb.ID(math.MaxUint64), runs[0].ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected %s but got %s instead", backend.ErrRunNotFound, err)
|
t.Fatalf("expected %s but got %s instead", &influxdb.ErrRunNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
|
foundRun0, err := sys.TaskService.FindRunByID(sys.Ctx, task.ID, runs[0].ID)
|
||||||
|
@ -1196,7 +1196,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
|
||||||
// Non-existent ID should return the right error.
|
// Non-existent ID should return the right error.
|
||||||
_, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
_, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, influxdb.ID(math.MaxUint64))
|
||||||
if !strings.Contains(err.Error(), "run not found") {
|
if !strings.Contains(err.Error(), "run not found") {
|
||||||
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", backend.ErrRunNotFound, err)
|
t.Errorf("expected retrying run that doesn't exist to return %v, got %v", &influxdb.ErrRunNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make a run.
|
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make a run.
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
package influxdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrRunCanceled is returned from the RunResult when a Run is Canceled. It is used mostly internally.
|
||||||
|
ErrRunCanceled = Error{
|
||||||
|
Code: EInternal,
|
||||||
|
Msg: "run canceled",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
|
||||||
|
ErrTaskNotClaimed = Error{
|
||||||
|
Code: EConflict,
|
||||||
|
Msg: "task not claimed",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
|
||||||
|
ErrTaskAlreadyClaimed = Error{
|
||||||
|
Code: EConflict,
|
||||||
|
Msg: "task already claimed",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrNoRunsFound is returned when searching for a range of runs, but none are found.
|
||||||
|
ErrNoRunsFound = Error{
|
||||||
|
Code: ENotFound,
|
||||||
|
Msg: "no matching runs found",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrInvalidTaskID error object for bad id's
|
||||||
|
ErrInvalidTaskID = Error{
|
||||||
|
Code: EInvalid,
|
||||||
|
Msg: "invalid id",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrTaskNotFound indicates no task could be found for given parameters.
|
||||||
|
ErrTaskNotFound = Error{
|
||||||
|
Code: ENotFound,
|
||||||
|
Msg: "task not found",
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrRunNotFound is returned when searching for a single run that doesn't exist.
|
||||||
|
ErrRunNotFound = Error{
|
||||||
|
Code: ENotFound,
|
||||||
|
Msg: "run not found",
|
||||||
|
}
|
||||||
|
|
||||||
|
ErrPageSizeTooSmall = Error{
|
||||||
|
Msg: "cannot have negative page limit",
|
||||||
|
Code: EInvalid,
|
||||||
|
}
|
||||||
|
|
||||||
|
ErrPageSizeTooLarge = Error{
|
||||||
|
Msg: fmt.Sprintf("cannot use page size larger then %d", MaxPageSize),
|
||||||
|
Code: EInvalid,
|
||||||
|
}
|
||||||
|
|
||||||
|
ErrOrgNotFound = Error{
|
||||||
|
Msg: "organization not found",
|
||||||
|
Code: ENotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
ErrTaskRunAlreadyQueued = Error{
|
||||||
|
Msg: "run already queued",
|
||||||
|
Code: EConflict,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit.
|
||||||
|
ErrOutOfBoundsLimit = Error{
|
||||||
|
Code: EUnprocessableEntity,
|
||||||
|
Msg: "run limit is out of bounds, must be between 1 and 500",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func ErrInternalTaskServiceError(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInternal,
|
||||||
|
Msg: fmt.Sprintf("unexpected error in tasks; Err: %v", err),
|
||||||
|
Op: "kv/task",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrUnexpectedTaskBucketErr a generic error we can use when we rail to retrieve a bucket
|
||||||
|
func ErrUnexpectedTaskBucketErr(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInternal,
|
||||||
|
Msg: fmt.Sprintf("unexpected error retrieving task bucket; Err: %v", err),
|
||||||
|
Op: "kv/taskBucket",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrTaskTimeParse an error for time parsing errors
|
||||||
|
func ErrTaskTimeParse(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInvalid,
|
||||||
|
Msg: fmt.Sprintf("unexpected error parsing time; Err: %v", err),
|
||||||
|
Op: "kv/taskCron",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ErrTaskOptionParse(err error) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInvalid,
|
||||||
|
Msg: fmt.Sprintf("invalid options; Err: %v", err),
|
||||||
|
Op: "kv/taskOptions",
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrRunNotDueYet is returned from CreateNextRun if a run is not yet due.
|
||||||
|
func ErrRunNotDueYet(dueAt int64) *Error {
|
||||||
|
return &Error{
|
||||||
|
Code: EInvalid,
|
||||||
|
Msg: fmt.Sprintf("run not due until: %v", time.Unix(dueAt, 0).UTC().Format(time.RFC3339)),
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue