feat(task): add task types (#14567)
parent
17de20e80f
commit
e922c8a26f
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
### Features
|
||||
1. [14495](https://github.com/influxdata/influxdb/pull/14495): optional gzip compression of the query CSV response.
|
||||
1. [14567](https://github.com/influxdata/influxdb/pull/14567): Add task types.
|
||||
|
||||
### UI Improvements
|
||||
|
||||
|
|
|
|||
|
|
@ -122,6 +122,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
|
|||
return nil, influxdb.ErrMissingToken
|
||||
}
|
||||
|
||||
if t.Type == influxdb.TaskTypeWildcard {
|
||||
return nil, influxdb.ErrInvalidTaskType
|
||||
}
|
||||
|
||||
p, err := platform.NewPermission(platform.WriteAction, platform.TasksResourceType, t.OrganizationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -216,6 +216,25 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
|||
auth: &influxdb.Authorization{},
|
||||
},
|
||||
{
|
||||
name: "create bad type",
|
||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
Type: influxdb.TaskTypeWildcard,
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
every: 1s,
|
||||
}
|
||||
from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
||||
})
|
||||
if err != influxdb.ErrInvalidTaskType {
|
||||
return errors.New("failed to error with invalid task type")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
auth: &influxdb.Authorization{},
|
||||
}, {
|
||||
name: "create success",
|
||||
auth: r.Auth,
|
||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||
|
|
@ -232,7 +251,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "create badbucket",
|
||||
name: "create bad bucket",
|
||||
auth: r.Auth,
|
||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -6556,6 +6556,9 @@ components:
|
|||
id:
|
||||
readOnly: true
|
||||
type: string
|
||||
type:
|
||||
description: The type of task, this can be used for filtering tasks on list actions.
|
||||
type: string
|
||||
orgID:
|
||||
description: The ID of the organization that owns this Task.
|
||||
type: string
|
||||
|
|
@ -8816,6 +8819,9 @@ components:
|
|||
TaskCreateRequest:
|
||||
type: object
|
||||
properties:
|
||||
type:
|
||||
description: The type of task, this can be used for filtering tasks on list actions.
|
||||
type: string
|
||||
orgID:
|
||||
description: The ID of the organization that owns this Task.
|
||||
type: string
|
||||
|
|
|
|||
|
|
@ -358,6 +358,10 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs platform.O
|
|||
req.filter.Limit = platform.TaskDefaultPageSize
|
||||
}
|
||||
|
||||
if ttype := qp.Get("type"); ttype != "" {
|
||||
req.filter.Type = &ttype
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
|
|
@ -1309,6 +1313,10 @@ func (t TaskService) FindTasks(ctx context.Context, filter platform.TaskFilter)
|
|||
val.Add("limit", strconv.Itoa(filter.Limit))
|
||||
}
|
||||
|
||||
if filter.Type != nil {
|
||||
val.Add("type", *filter.Type)
|
||||
}
|
||||
|
||||
u.RawQuery = val.Encode()
|
||||
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
|
|
|
|||
|
|
@ -67,7 +67,12 @@ func TestTaskService(t *testing.T) {
|
|||
Token: auth.Token,
|
||||
}
|
||||
|
||||
cFunc := func() (servicetest.TestCreds, error) {
|
||||
cFunc := func(t *testing.T) (servicetest.TestCreds, error) {
|
||||
org := &platform.Organization{Name: t.Name() + "_org"}
|
||||
if err := service.CreateOrganization(ctx, org); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return servicetest.TestCreds{
|
||||
OrgID: org.ID,
|
||||
Org: org.Name,
|
||||
|
|
|
|||
27
kv/task.go
27
kv/task.go
|
|
@ -227,6 +227,14 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
|
|||
continue
|
||||
}
|
||||
|
||||
if filter.Type == nil {
|
||||
ft := ""
|
||||
filter.Type = &ft
|
||||
}
|
||||
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != task.Type {
|
||||
continue
|
||||
}
|
||||
|
||||
ts = append(ts, task)
|
||||
|
||||
if len(ts) >= filter.Limit {
|
||||
|
|
@ -296,10 +304,16 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
// insert the new task into the list
|
||||
if t != nil {
|
||||
ts = append(ts, t)
|
||||
typ := ""
|
||||
if filter.Type != nil {
|
||||
typ = *filter.Type
|
||||
}
|
||||
|
||||
// if the filter type matches task type or filter type is a wildcard
|
||||
if typ == t.Type || typ == influxdb.TaskTypeWildcard {
|
||||
ts = append(ts, t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -334,6 +348,14 @@ func (s *Service) findTaskByOrg(ctx context.Context, tx Tx, filter influxdb.Task
|
|||
break
|
||||
}
|
||||
|
||||
if filter.Type == nil {
|
||||
ft := ""
|
||||
filter.Type = &ft
|
||||
}
|
||||
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != t.Type {
|
||||
continue
|
||||
}
|
||||
|
||||
// insert the new task into the list
|
||||
ts = append(ts, t)
|
||||
|
||||
|
|
@ -491,6 +513,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
|||
createdAt := time.Now().UTC().Format(time.RFC3339)
|
||||
task := &influxdb.Task{
|
||||
ID: s.IDGenerator.ID(),
|
||||
Type: tc.Type,
|
||||
OrganizationID: org.ID,
|
||||
Organization: org.Name,
|
||||
AuthorizationID: auth.Identifier(),
|
||||
|
|
|
|||
5
task.go
5
task.go
|
|
@ -20,11 +20,14 @@ const (
|
|||
|
||||
TaskStatusActive = "active"
|
||||
TaskStatusInactive = "inactive"
|
||||
|
||||
TaskTypeWildcard = "*"
|
||||
)
|
||||
|
||||
// Task is a task. 🎊
|
||||
type Task struct {
|
||||
ID ID `json:"id"`
|
||||
Type string `json:"type,omitempty"`
|
||||
OrganizationID ID `json:"orgID"`
|
||||
Organization string `json:"org"`
|
||||
AuthorizationID ID `json:"authorizationID"`
|
||||
|
|
@ -134,6 +137,7 @@ type TaskService interface {
|
|||
|
||||
// TaskCreate is the set of values to create a task.
|
||||
type TaskCreate struct {
|
||||
Type string `json:"type,omitempty"`
|
||||
Flux string `json:"flux"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
|
|
@ -374,6 +378,7 @@ func (t *TaskUpdate) UpdateFlux(oldFlux string) error {
|
|||
|
||||
// TaskFilter represents a set of filters that restrict the returned results
|
||||
type TaskFilter struct {
|
||||
Type *string
|
||||
After *ID
|
||||
OrganizationID *ID
|
||||
Organization string
|
||||
|
|
|
|||
|
|
@ -83,6 +83,12 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
|
|||
t.Parallel()
|
||||
testManualRun(t, sys)
|
||||
})
|
||||
|
||||
t.Run("Task Type", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testTaskType(t, sys)
|
||||
})
|
||||
|
||||
})
|
||||
case "analytical":
|
||||
t.Run("AnalyticalTaskService", func(t *testing.T) {
|
||||
|
|
@ -149,7 +155,7 @@ type System struct {
|
|||
// However, if the system needs to verify credentials,
|
||||
// the caller should set this value and return valid IDs and a valid token.
|
||||
// It is safe if this returns the same values every time it is called.
|
||||
CredsFunc func() (TestCreds, error)
|
||||
CredsFunc func(*testing.T) (TestCreds, error)
|
||||
}
|
||||
|
||||
func testTaskCRUD(t *testing.T, sys *System) {
|
||||
|
|
@ -1405,7 +1411,7 @@ func creds(t *testing.T, s *System) TestCreds {
|
|||
}
|
||||
}
|
||||
|
||||
c, err := s.CredsFunc()
|
||||
c, err := s.CredsFunc(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
@ -1437,3 +1443,86 @@ option task = {
|
|||
from(bucket: "b")
|
||||
|> http.to(url: "http://example.com")`
|
||||
)
|
||||
|
||||
func testTaskType(t *testing.T, sys *System) {
|
||||
cr := creds(t, sys)
|
||||
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
|
||||
|
||||
// Create a tasks
|
||||
ts := influxdb.TaskCreate{
|
||||
OrganizationID: cr.OrgID,
|
||||
Flux: fmt.Sprintf(scriptFmt, 0),
|
||||
Token: cr.Token,
|
||||
}
|
||||
|
||||
tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !tsk.ID.Valid() {
|
||||
t.Fatal("no task ID set")
|
||||
}
|
||||
|
||||
tc := influxdb.TaskCreate{
|
||||
Type: "cows",
|
||||
OrganizationID: cr.OrgID,
|
||||
Flux: fmt.Sprintf(scriptFmt, 0),
|
||||
Token: cr.Token,
|
||||
}
|
||||
|
||||
tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !tskCow.ID.Valid() {
|
||||
t.Fatal("no task ID set")
|
||||
}
|
||||
|
||||
tp := influxdb.TaskCreate{
|
||||
Type: "pigs",
|
||||
OrganizationID: cr.OrgID,
|
||||
Flux: fmt.Sprintf(scriptFmt, 0),
|
||||
Token: cr.Token,
|
||||
}
|
||||
|
||||
tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !tskPig.ID.Valid() {
|
||||
t.Fatal("no task ID set")
|
||||
}
|
||||
|
||||
// get default tasks
|
||||
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
if task.Type != "" {
|
||||
t.Fatal("recieved a task with a type when sending no type restriction")
|
||||
}
|
||||
}
|
||||
|
||||
// get filtered tasks
|
||||
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &tc.Type})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(tasks) != 1 {
|
||||
t.Fatalf("failed to return tasks by type, expected 1, got %d", len(tasks))
|
||||
}
|
||||
|
||||
// get all tasks
|
||||
wc := influxdb.TaskTypeWildcard
|
||||
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &wc})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(tasks) != 3 {
|
||||
t.Fatalf("failed to return tasks with wildcard, expected 3, got %d", len(tasks))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,11 @@ var (
|
|||
Msg: "invalid id",
|
||||
}
|
||||
|
||||
// ErrInvalidTaskType error object for bad id's
|
||||
ErrInvalidTaskType = &Error{
|
||||
Code: EInvalid,
|
||||
Msg: "invalid task type",
|
||||
}
|
||||
// ErrTaskNotFound indicates no task could be found for given parameters.
|
||||
ErrTaskNotFound = &Error{
|
||||
Code: ENotFound,
|
||||
|
|
|
|||
Loading…
Reference in New Issue