feat(task): Allow tasks to run more isolated from other task systems (#15384)

* feat(task): Allow tasks to run more isolated from other task systems

To allow the task internal system to be used for user created tasks as well
as checks, notification and other future additions we needed to take 2 actions:

1 - We need to use type as a first class citizen, meaning that task's have a type
and each system that will be creating tasks will set the task type through the api.
This is a change to the previous assumption that any user could set task types. This change
will allow us to have other service's white label the task service for their own purposes and not
have to worry about colissions between the types.

2 - We needed to allow other systems to add data specific to the problem they are trying to solve.
For this purpose adding a `metadata` field to the internal task system which should allow other systems to
use the task service.

These changes will allow us in the future to allow for the current check's and notifications implementations
to create a task with meta data instead of creating a check object and a task object in the database.
By allowing this new behavior checks, notifications, and user task's can all follow the same pattern:

Field an api request in a system specific http endpoint, use a small translation to the `TaskService` function call,
translate the results to what the api expects for this system, and return results.

* fix(task): undo additional check for ownerID because check is not ready
pull/15347/head
Lyon Hill 2019-10-11 08:53:38 -06:00 committed by GitHub
parent bdba601c30
commit 3c6779f011
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 85 additions and 196 deletions

View File

@ -117,10 +117,6 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t influxdb.TaskC
return nil, influxdb.ErrInvalidOwnerID
}
if t.Type == influxdb.TaskTypeWildcard {
return nil, influxdb.ErrInvalidTaskType
}
p, err := influxdb.NewPermission(influxdb.WriteAction, influxdb.TasksResourceType, t.OrganizationID)
if err != nil {
return nil, err

View File

@ -213,25 +213,6 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
auth: &influxdb.Authorization{},
},
{
name: "create bad type",
check: func(ctx context.Context, svc influxdb.TaskService) error {
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
OrganizationID: r.Org.ID,
OwnerID: r.Auth.GetUserID(),
Type: influxdb.TaskTypeWildcard,
Flux: `option task = {
name: "my_task",
every: 1s,
}
from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
})
if err != influxdb.ErrInvalidTaskType {
return errors.New("failed to error with invalid task type")
}
return nil
},
auth: &influxdb.Authorization{},
}, {
name: "create success",
auth: r.Auth,
check: func(ctx context.Context, svc influxdb.TaskService) error {

View File

@ -9261,9 +9261,6 @@ components:
TaskCreateRequest:
type: object
properties:
type:
description: The type of task, this can be used for filtering tasks on list actions.
type: string
orgID:
description: The ID of the organization that owns this Task.
type: string

View File

@ -356,9 +356,8 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request, orgs influxdb.O
req.filter.Limit = influxdb.TaskDefaultPageSize
}
if ttype := qp.Get("type"); ttype != "" {
req.filter.Type = &ttype
}
// the task api can only create or lookup system tasks.
req.filter.Type = &influxdb.TaskSystemType
if name := qp.Get("name"); name != "" {
req.filter.Name = &name
@ -445,6 +444,9 @@ func decodePostTaskRequest(ctx context.Context, r *http.Request) (*postTaskReque
}
tc.OwnerID = auth.GetUserID()
// when creating a task we set the type so we can filter later.
tc.Type = influxdb.TaskSystemType
if err := tc.Validate(); err != nil {
return nil, err
}

View File

@ -1,101 +0,0 @@
package http_test
import (
"context"
"net/http/httptest"
"testing"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/servicetest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func TestTaskService(t *testing.T) {
t.Parallel()
servicetest.TestTaskService(
t,
func(t *testing.T) (*servicetest.System, context.CancelFunc) {
service := kv.NewService(inmem.NewKVStore())
ctx, cancelFunc := context.WithCancel(context.Background())
if err := service.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
}
h := http.NewAuthenticationHandler(http.ErrorHandler(0))
h.AuthorizationService = service
h.UserService = &mock.UserService{
FindUserByIDFn: func(ctx context.Context, id platform.ID) (*platform.User, error) {
return &platform.User{}, nil
},
}
th := http.NewTaskHandler(&http.TaskBackend{
HTTPErrorHandler: http.ErrorHandler(0),
Logger: zaptest.NewLogger(t).With(zap.String("handler", "task")),
TaskService: service,
AuthorizationService: service,
OrganizationService: service,
UserResourceMappingService: service,
LabelService: service,
UserService: service,
BucketService: service,
})
h.Handler = th
org := &platform.Organization{Name: t.Name() + "_org"}
if err := service.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}
user := &platform.User{Name: t.Name() + "_user"}
if err := service.CreateUser(ctx, user); err != nil {
t.Fatal(err)
}
auth := platform.Authorization{UserID: user.ID, OrgID: org.ID}
if err := service.CreateAuthorization(ctx, &auth); err != nil {
t.Fatal(err)
}
server := httptest.NewServer(h)
go func() {
<-ctx.Done()
server.Close()
}()
taskService := http.TaskService{
Addr: server.URL,
Token: auth.Token,
}
cFunc := func(t *testing.T) (servicetest.TestCreds, error) {
org := &platform.Organization{Name: t.Name() + "_org"}
if err := service.CreateOrganization(ctx, org); err != nil {
t.Fatal(err)
}
return servicetest.TestCreds{
OrgID: org.ID,
Org: org.Name,
UserID: user.ID,
AuthorizationID: auth.ID,
Token: auth.Token,
}, nil
}
return &servicetest.System{
TaskControlService: service,
TaskService: taskService,
I: service,
Ctx: ctx,
CredsFunc: cFunc,
}, cancelFunc
},
"transactional",
)
}

View File

@ -286,15 +286,9 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
continue
}
if filter.Type == nil {
ft := ""
filter.Type = &ft
if taskFilterMatch(filter.Type, task.Type) {
ts = append(ts, task)
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != task.Type {
continue
}
ts = append(ts, task)
if len(ts) >= filter.Limit {
break
@ -369,13 +363,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
}
if t != nil {
typ := ""
if filter.Type != nil {
typ = *filter.Type
}
// if the filter type matches task type or filter type is a wildcard
if typ == t.Type || typ == influxdb.TaskTypeWildcard {
if taskFilterMatch(filter.Type, t.Type) {
ts = append(ts, t)
}
}
@ -412,11 +400,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
break
}
if filter.Type == nil {
ft := ""
filter.Type = &ft
}
if *filter.Type != influxdb.TaskTypeWildcard && *filter.Type != t.Type {
if !taskFilterMatch(filter.Type, t.Type) {
continue
}
@ -480,8 +464,13 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
} else {
t.LatestCompleted = t.CreatedAt
}
// insert the new task into the list
ts = append(ts, t)
if t != nil {
if taskFilterMatch(filter.Type, t.Type) {
ts = append(ts, t)
}
}
}
// if someone has a limit of 1
@ -507,6 +496,11 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
} else {
t.LatestCompleted = t.CreatedAt
}
if !taskFilterMatch(filter.Type, t.Type) {
continue
}
// insert the new task into the list
ts = append(ts, t)
@ -594,6 +588,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
OrganizationID: org.ID,
Organization: org.Name,
OwnerID: tc.OwnerID,
Metadata: tc.Metadata,
Name: opt.Name,
Description: tc.Description,
Status: tc.Status,
@ -735,6 +730,11 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
if upd.Metadata != nil {
task.Metadata = upd.Metadata
task.UpdatedAt = updatedAt
}
if upd.LatestCompleted != nil {
// make sure we only update latest completed one way
tlc, _ := time.Parse(time.RFC3339, task.LatestCompleted)
@ -1901,3 +1901,19 @@ func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
}
func taskFilterMatch(filter *string, ttype string) bool {
// if they want a system task the record may be system or an empty string
if filter != nil {
// if the task is either "system" or "" it qaulifies as a system task
if *filter == influxdb.TaskSystemType && (ttype == influxdb.TaskSystemType || ttype == "") {
return true
}
// otherwise check task type against the filter
if *filter != ttype {
return false
}
}
return true
}

59
task.go
View File

@ -20,29 +20,33 @@ const (
TaskStatusActive = "active"
TaskStatusInactive = "inactive"
)
TaskTypeWildcard = "*"
var (
// TaskSystemType is the type set in tasks' for all crud requests
TaskSystemType = "system"
)
// Task is a task. 🎊
type Task struct {
ID ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"-"`
Authorization *Authorization `json:"-"`
OwnerID ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
ID ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID ID `json:"orgID"`
Organization string `json:"org"`
AuthorizationID ID `json:"-"`
Authorization *Authorization `json:"-"`
OwnerID ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// EffectiveCron returns the effective cron string of the options.
@ -156,13 +160,14 @@ type TaskService interface {
// TaskCreate is the set of values to create a task.
type TaskCreate struct {
Type string `json:"type,omitempty"`
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
OrganizationID ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"`
OwnerID ID `json:"-"`
Type string `json:"type,omitempty"`
Flux string `json:"flux"`
Description string `json:"description,omitempty"`
Status string `json:"status,omitempty"`
OrganizationID ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"`
OwnerID ID `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
}
func (t TaskCreate) Validate() error {
@ -184,10 +189,12 @@ type TaskUpdate struct {
Description *string `json:"description,omitempty"`
// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *string `json:"-"`
LatestCompleted *string `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
}
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {

View File

@ -28,8 +28,7 @@ type Coordinator interface {
// each task it calls the provided coordinators task created method
func NotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, coord Coordinator, logger *zap.Logger) error {
// If we missed a Create Action
wildcard := influxdb.TaskTypeWildcard
tasks, _, err := ts.FindTasks(ctx, influxdb.TaskFilter{Type: &wildcard})
tasks, _, err := ts.FindTasks(ctx, influxdb.TaskFilter{})
if err != nil {
return err
}
@ -53,7 +52,6 @@ func NotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, coord Coor
}
tasks, _, err = ts.FindTasks(ctx, influxdb.TaskFilter{
Type: &wildcard,
After: &tasks[len(tasks)-1].ID,
})
if err != nil {

View File

@ -55,10 +55,6 @@ func Test_NotifyCoordinatorOfCreated(t *testing.T) {
t.Errorf("expected nil, found %q", err)
}
if *tasks.filter.Type != influxdb.TaskTypeWildcard {
t.Error("expected wildcard type filter")
}
if diff := cmp.Diff([]update{
{two, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}},
}, tasks.updates); diff != "" {

View File

@ -166,6 +166,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
OwnerID: cr.UserID,
Type: influxdb.TaskSystemType,
}
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
@ -247,6 +248,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
Offset: "5s",
Status: string(backend.DefaultTaskStatus),
Flux: fmt.Sprintf(scriptFmt, 0),
Type: influxdb.TaskSystemType,
}
for fn, f := range found {
if diff := cmp.Diff(f, want); diff != "" {
@ -1530,15 +1532,15 @@ func testTaskType(t *testing.T, sys *System) {
t.Fatal("no task ID set")
}
// get default tasks
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID})
// get system tasks (or task's with no type)
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &influxdb.TaskSystemType})
if err != nil {
t.Fatal(err)
}
for _, task := range tasks {
if task.Type != "" {
t.Fatal("recieved a task with a type when sending no type restriction")
if task.Type != "" && task.Type != influxdb.TaskSystemType {
t.Fatal("received a task with a type when sending no type restriction")
}
}
@ -1549,12 +1551,12 @@ func testTaskType(t *testing.T, sys *System) {
}
if len(tasks) != 1 {
fmt.Printf("tasks: %+v\n", tasks)
t.Fatalf("failed to return tasks by type, expected 1, got %d", len(tasks))
}
// get all tasks
wc := influxdb.TaskTypeWildcard
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID, Type: &wc})
tasks, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{OrganizationID: &cr.OrgID})
if err != nil {
t.Fatal(err)
}

View File

@ -36,11 +36,6 @@ var (
Msg: "invalid id",
}
// ErrInvalidTaskType error object for bad id's
ErrInvalidTaskType = &Error{
Code: EInvalid,
Msg: "invalid task type",
}
// ErrTaskNotFound indicates no task could be found for given parameters.
ErrTaskNotFound = &Error{
Code: ENotFound,