feat(task): Remove token's from task structures (#14712)
* feat(task): Remove token's from task structures We had previously removed token's from the task api but left the token in place in several locations in the stack. Now we can cleanly remove the extra tokens.pull/14719/head
parent
06a76374ca
commit
0b247cce5b
|
@ -118,10 +118,9 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
|
|||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
// TODO: add owner check
|
||||
// if t.Token == "" {
|
||||
// return nil, influxdb.ErrMissingToken
|
||||
// }
|
||||
if !t.OwnerID.Valid() {
|
||||
return nil, influxdb.ErrInvalidOwnerID
|
||||
}
|
||||
|
||||
if t.Type == influxdb.TaskTypeWildcard {
|
||||
return nil, influxdb.ErrInvalidTaskType
|
||||
|
|
|
@ -39,7 +39,7 @@ func TestOnboardingValidation(t *testing.T) {
|
|||
|
||||
_, err = ts.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
OwnerID: r.Auth.GetUserID(),
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
every: 1s,
|
||||
|
@ -220,7 +220,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
|||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
OwnerID: r.Auth.GetUserID(),
|
||||
Type: influxdb.TaskTypeWildcard,
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
|
@ -240,7 +240,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
|||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||
_, err := svc.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
OwnerID: r.Auth.GetUserID(),
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
every: 1s,
|
||||
|
@ -260,7 +260,7 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`,
|
|||
errfmt = "expected %q, got %q"
|
||||
_, err = svc.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
OwnerID: r.Auth.GetUserID(),
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
every: 1s,
|
||||
|
@ -308,7 +308,7 @@ from(bucket:"bad") |> range(start:-5m) |> to(bucket:"bad", org:"thing")`,
|
|||
errfmt = "expected %q, got %q"
|
||||
_, err = svc.CreateTask(ctx, influxdb.TaskCreate{
|
||||
OrganizationID: r.Org.ID,
|
||||
Token: r.Auth.Token,
|
||||
OwnerID: r.Auth.GetUserID(),
|
||||
Flux: `option task = {
|
||||
name: "my_task",
|
||||
every: 1s,
|
||||
|
|
|
@ -8840,10 +8840,7 @@ components:
|
|||
flux:
|
||||
description: The Flux script to run for this task.
|
||||
type: string
|
||||
token:
|
||||
description: The token to use for authenticating this task when it executes queries.
|
||||
type: string
|
||||
required: [flux, token]
|
||||
required: [flux]
|
||||
TaskUpdateRequest:
|
||||
type: object
|
||||
properties:
|
||||
|
@ -8864,9 +8861,6 @@ components:
|
|||
offset:
|
||||
description: Override the 'offset' option in the flux script.
|
||||
type: string
|
||||
token:
|
||||
description: Override the existing token associated with the task.
|
||||
type: string
|
||||
Check:
|
||||
oneOf:
|
||||
- $ref: "#/components/schemas/DeadmanCheck"
|
||||
|
|
|
@ -448,7 +448,6 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
|
|||
args: args{
|
||||
taskCreate: platform.TaskCreate{
|
||||
OrganizationID: 1,
|
||||
Token: "mytoken",
|
||||
Flux: "abc",
|
||||
},
|
||||
},
|
||||
|
@ -498,7 +497,6 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
|
|||
args: args{
|
||||
taskCreate: platform.TaskCreate{
|
||||
OrganizationID: 1,
|
||||
Token: "mytoken",
|
||||
Flux: "abc",
|
||||
},
|
||||
},
|
||||
|
@ -530,7 +528,6 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
|
|||
args: args{
|
||||
taskCreate: platform.TaskCreate{
|
||||
OrganizationID: 1,
|
||||
Token: "mytoken",
|
||||
Flux: "abc",
|
||||
},
|
||||
},
|
||||
|
@ -1269,7 +1266,6 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) {
|
|||
b, err := json.Marshal(platform.TaskCreate{
|
||||
Flux: script,
|
||||
Organization: o.Name,
|
||||
Token: authz.Token,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
15
kv/task.go
15
kv/task.go
|
@ -542,6 +542,12 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
|
|||
return nil, influxdb.ErrOrgNotFound
|
||||
}
|
||||
|
||||
// TODO: Uncomment this once the checks/notifications no longer create tasks in kv
|
||||
// confirm the owner is a real user.
|
||||
// if _, err = s.findUserByID(ctx, tx, tc.OwnerID); err != nil {
|
||||
// return nil, influxdb.ErrInvalidOwnerID
|
||||
// }
|
||||
|
||||
opt, err := options.FromScript(tc.Flux)
|
||||
if err != nil {
|
||||
return nil, influxdb.ErrTaskOptionParse(err)
|
||||
|
@ -682,15 +688,6 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
|
|||
}
|
||||
}
|
||||
|
||||
// update the Token
|
||||
if upd.Token != "" {
|
||||
auth, err := s.findAuthorizationByToken(ctx, tx, upd.Token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task.AuthorizationID = auth.ID
|
||||
}
|
||||
|
||||
if upd.Description != nil {
|
||||
task.Description = *upd.Description
|
||||
}
|
||||
|
|
13
task.go
13
task.go
|
@ -145,7 +145,6 @@ type TaskCreate struct {
|
|||
Status string `json:"status,omitempty"`
|
||||
OrganizationID ID `json:"orgID,omitempty"`
|
||||
Organization string `json:"org,omitempty"`
|
||||
Token string `json:"-"`
|
||||
OwnerID ID `json:"-"`
|
||||
}
|
||||
|
||||
|
@ -172,9 +171,6 @@ type TaskUpdate struct {
|
|||
|
||||
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
|
||||
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
|
||||
|
||||
// Optional token override.
|
||||
Token string `json:"token,omitempty"`
|
||||
}
|
||||
|
||||
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
|
||||
|
@ -199,8 +195,6 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
|
|||
Concurrency *int64 `json:"concurrency,omitempty"`
|
||||
|
||||
Retry *int64 `json:"retry,omitempty"`
|
||||
|
||||
Token string `json:"token,omitempty"`
|
||||
}{}
|
||||
|
||||
if err := json.Unmarshal(data, &jo); err != nil {
|
||||
|
@ -218,8 +212,6 @@ func (t *TaskUpdate) UnmarshalJSON(data []byte) error {
|
|||
t.Options.Retry = jo.Retry
|
||||
t.Flux = jo.Flux
|
||||
t.Status = jo.Status
|
||||
t.Token = jo.Token
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -242,8 +234,6 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
|
|||
Concurrency *int64 `json:"concurrency,omitempty"`
|
||||
|
||||
Retry *int64 `json:"retry,omitempty"`
|
||||
|
||||
Token string `json:"token,omitempty"`
|
||||
}{}
|
||||
jo.Name = t.Options.Name
|
||||
jo.Cron = t.Options.Cron
|
||||
|
@ -257,7 +247,6 @@ func (t TaskUpdate) MarshalJSON() ([]byte, error) {
|
|||
jo.Retry = t.Options.Retry
|
||||
jo.Flux = t.Flux
|
||||
jo.Status = t.Status
|
||||
jo.Token = t.Token
|
||||
return json.Marshal(jo)
|
||||
}
|
||||
|
||||
|
@ -265,7 +254,7 @@ func (t TaskUpdate) Validate() error {
|
|||
switch {
|
||||
case !t.Options.Every.IsZero() && t.Options.Cron != "":
|
||||
return errors.New("cannot specify both every and cron")
|
||||
case t.Flux == nil && t.Status == nil && t.Options.IsZero() && t.Token == "":
|
||||
case t.Flux == nil && t.Status == nil && t.Options.IsZero():
|
||||
return errors.New("cannot update task without content")
|
||||
case t.Status != nil && *t.Status != TaskStatusActive && *t.Status != TaskStatusInactive:
|
||||
return fmt.Errorf("invalid task status: %q", *t.Status)
|
||||
|
|
|
@ -119,7 +119,7 @@ func TestCoordinator(t *testing.T) {
|
|||
releaseChan := sched.TaskReleaseChan()
|
||||
updateChan := sched.TaskUpdateChan()
|
||||
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ func TestCoordinator(t *testing.T) {
|
|||
t.Fatal("task sent to scheduler doesn't match task created")
|
||||
}
|
||||
|
||||
task, err = coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
task, err = coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func TestCoordinator_ClaimTaskUpdatesLatestCompleted(t *testing.T) {
|
|||
|
||||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
|
|||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
|
||||
// Create an isolated task directly through the store so the coordinator doesn't know about it.
|
||||
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -296,7 +296,7 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) {
|
|||
|
||||
createdIDs := make([]platform.ID, numTasks)
|
||||
for i := 0; i < numTasks; i++ {
|
||||
ctr := platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script}
|
||||
ctr := platform.TaskCreate{OrganizationID: 1, Flux: script}
|
||||
if i == inactiveTaskIndex {
|
||||
ctr.Status = string(backend.TaskInactive)
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ func TestCoordinator_ForceRun(t *testing.T) {
|
|||
coord := coordinator.New(zaptest.NewLogger(t), sched, ts, coordinator.WithoutExistingTasks())
|
||||
|
||||
// Create an isolated task directly through the store so the coordinator doesn't know about it.
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
|
||||
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Flux: script})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -79,10 +79,10 @@ var (
|
|||
Msg: "run limit is out of bounds, must be between 1 and 500",
|
||||
}
|
||||
|
||||
// ErrMissingToken is called when trying to create a Task without providing a token
|
||||
ErrMissingToken = &Error{
|
||||
// ErrInvalidOwnerID is called when trying to create a task with out a valid ownerID
|
||||
ErrInvalidOwnerID = &Error{
|
||||
Code: EInvalid,
|
||||
Msg: "cannot create task without valid token",
|
||||
Msg: "cannot create task with invalid ownerID",
|
||||
}
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue