fix(task): create authorizations for tasks, which can read their task
Also set the generated token's description while we're here. This enables us to use task's Authorization when we need to query the system bucket to get run logs, etc. but we only have a Session.pull/12096/head
parent
c78477314a
commit
12a604172f
|
@ -19,6 +19,7 @@ import (
|
|||
pcontext "github.com/influxdata/influxdb/context"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/task/backend"
|
||||
"github.com/influxdata/influxdb/task/options"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -369,44 +370,98 @@ func decodeGetTasksRequest(ctx context.Context, r *http.Request) (*getTasksReque
|
|||
return req, nil
|
||||
}
|
||||
|
||||
func (h *TaskHandler) createTaskAuthorizationIfNotExists(ctx context.Context, a platform.Authorizer, t *platform.TaskCreate) error {
|
||||
// createBootstrapTaskAuthorizationIfNotExists checks if a the task create request hasn't specified a token, and if the request came from a session,
|
||||
// and if both of those are true, it creates an authorization and return it.
|
||||
//
|
||||
// Note that the created authorization will have permissions required for the task,
|
||||
// but it won't have permissions to read the task, as we don't have the task ID yet.
|
||||
//
|
||||
// This method may return a nil error and a nil authorization, if there wasn't a need to create an authorization.
|
||||
func (h *TaskHandler) createBootstrapTaskAuthorizationIfNotExists(ctx context.Context, a platform.Authorizer, t *platform.TaskCreate) (*platform.Authorization, error) {
|
||||
if t.Token != "" {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s, ok := a.(*platform.Session)
|
||||
if !ok {
|
||||
// If an authorization was used continue
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
spec, err := flux.Compile(ctx, t.Flux, time.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
preAuthorizer := query.NewPreAuthorizer(h.BucketService)
|
||||
ps, err := preAuthorizer.RequiredPermissions(ctx, spec)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := authorizer.VerifyPermissions(ctx, ps); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts, err := options.FromScript(t.Flux)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
auth := &platform.Authorization{
|
||||
OrgID: t.OrganizationID,
|
||||
UserID: s.UserID,
|
||||
Permissions: ps,
|
||||
Description: fmt.Sprintf("bootstrap authorization for task %q", opts.Name),
|
||||
}
|
||||
|
||||
if err := h.AuthorizationService.CreateAuthorization(ctx, auth); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t.Token = auth.Token
|
||||
|
||||
return auth, nil
|
||||
}
|
||||
|
||||
func (h *TaskHandler) finalizeBootstrappedTaskAuthorization(ctx context.Context, bootstrap *platform.Authorization, task *platform.Task) error {
|
||||
// If we created a bootstrapped authorization for a task,
|
||||
// we need to replace it with a new authorization that allows read access on the task.
|
||||
// Unfortunately for this case, updating authorizations is not allowed.
|
||||
readTaskPerm, err := platform.NewPermissionAtID(task.ID, platform.ReadAction, platform.TasksResourceType, bootstrap.OrgID)
|
||||
if err != nil {
|
||||
// We should never fail to create a new permission like this.
|
||||
return err
|
||||
}
|
||||
authzWithTask := &platform.Authorization{
|
||||
UserID: bootstrap.UserID,
|
||||
OrgID: bootstrap.OrgID,
|
||||
Permissions: append([]platform.Permission{*readTaskPerm}, bootstrap.Permissions...),
|
||||
Description: fmt.Sprintf("auto-generated authorization for task %q", task.Name),
|
||||
}
|
||||
|
||||
if err := h.AuthorizationService.CreateAuthorization(ctx, authzWithTask); err != nil {
|
||||
h.logger.Warn("Failed to finalize bootstrap authorization", zap.String("taskID", task.ID.String()))
|
||||
// The task exists with an authorization that can't read the task.
|
||||
return err
|
||||
}
|
||||
|
||||
// Assign the new authorization...
|
||||
u, err := h.TaskService.UpdateTask(ctx, task.ID, platform.TaskUpdate{Token: authzWithTask.Token})
|
||||
if err != nil {
|
||||
h.logger.Warn("Failed to assign finalized authorization", zap.String("authorizationID", bootstrap.ID.String()), zap.String("taskID", task.ID.String()))
|
||||
// The task exists with an authorization that can't read the task,
|
||||
// and we've created a new authorization for the task but not assigned it.
|
||||
return err
|
||||
}
|
||||
*task = *u
|
||||
|
||||
// .. and delete the old one.
|
||||
if err := h.AuthorizationService.DeleteAuthorization(ctx, bootstrap.ID); err != nil {
|
||||
// Since this is the last thing we're doing, just log it if we fail to delete for some reason.
|
||||
h.logger.Warn("Failed to delete bootstrap authorization", zap.String("authorizationID", bootstrap.ID.String()), zap.String("taskID", task.ID.String()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -453,7 +508,8 @@ func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err := h.createTaskAuthorizationIfNotExists(ctx, auth, &req.TaskCreate); err != nil {
|
||||
bootstrapAuthz, err := h.createBootstrapTaskAuthorizationIfNotExists(ctx, auth, &req.TaskCreate)
|
||||
if err != nil {
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
@ -471,6 +527,20 @@ func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if bootstrapAuthz != nil {
|
||||
// There was a bootstrapped authorization for this task.
|
||||
// Now we need to apply the final authorization for the task.
|
||||
if err := h.finalizeBootstrappedTaskAuthorization(ctx, bootstrapAuthz, task); err != nil {
|
||||
err = &platform.Error{
|
||||
Err: err,
|
||||
Msg: fmt.Sprintf("successfully created task with ID %s, but failed to finalize bootstrap token for task", task.ID.String()),
|
||||
Code: platform.EInternal,
|
||||
}
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusCreated, newTaskResponse(*task, []*platform.Label{})); err != nil {
|
||||
logEncodingError(h.logger, r, err)
|
||||
return
|
||||
|
|
|
@ -900,16 +900,27 @@ func TestService_handlePostTaskLabel(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTaskHandler_CreateTaskFromSession(t *testing.T) {
|
||||
i := inmem.NewService()
|
||||
|
||||
taskID := platform.ID(9)
|
||||
var createdTasks []platform.TaskCreate
|
||||
ts := &mock.TaskService{
|
||||
CreateTaskFn: func(_ context.Context, tc platform.TaskCreate) (*platform.Task, error) {
|
||||
createdTasks = append(createdTasks, tc)
|
||||
// Task with fake IDs so it can be serialized.
|
||||
return &platform.Task{ID: 9, OrganizationID: 99, AuthorizationID: 999}, nil
|
||||
return &platform.Task{ID: taskID, OrganizationID: 99, AuthorizationID: 999, Name: "x"}, nil
|
||||
},
|
||||
// Needed due to task authorization bootstrapping.
|
||||
UpdateTaskFn: func(ctx context.Context, id platform.ID, tu platform.TaskUpdate) (*platform.Task, error) {
|
||||
authz, err := i.FindAuthorizationByToken(ctx, tu.Token)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return &platform.Task{ID: taskID, OrganizationID: 99, AuthorizationID: authz.ID, Name: "x"}, nil
|
||||
},
|
||||
}
|
||||
|
||||
i := inmem.NewService()
|
||||
h := NewTaskHandler(&TaskBackend{
|
||||
Logger: zaptest.NewLogger(t),
|
||||
|
||||
|
@ -992,7 +1003,57 @@ func TestTaskHandler_CreateTaskFromSession(t *testing.T) {
|
|||
}
|
||||
|
||||
// The task should have been created with a valid token.
|
||||
if _, err := i.FindAuthorizationByToken(ctx, createdTasks[0].Token); err != nil {
|
||||
var createdTask platform.Task
|
||||
if err := json.Unmarshal([]byte(body), &createdTask); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
authz, err := i.FindAuthorizationByID(ctx, createdTask.AuthorizationID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if authz.UserID != u.ID {
|
||||
t.Fatalf("expected authorization to be associated with user %v, got %v", u.ID, authz.UserID)
|
||||
}
|
||||
if authz.OrgID != o.ID {
|
||||
t.Fatalf("expected authorization to be associated with org %v, got %v", o.ID, authz.OrgID)
|
||||
}
|
||||
const expDesc = `auto-generated authorization for task "x"`
|
||||
if authz.Description != expDesc {
|
||||
t.Fatalf("expected authorization to be created with description %q, got %q", expDesc, authz.Description)
|
||||
}
|
||||
|
||||
// The authorization should be allowed to read and write the target buckets,
|
||||
// and it should be allowed to read its task.
|
||||
if !authz.Allowed(platform.Permission{
|
||||
Action: platform.ReadAction,
|
||||
Resource: platform.Resource{
|
||||
Type: platform.BucketsResourceType,
|
||||
OrgID: &o.ID,
|
||||
ID: &bSrc.ID,
|
||||
},
|
||||
}) {
|
||||
t.Logf("WARNING: permissions on `from` buckets not yet accessible: update test after https://github.com/influxdata/flux/issues/114 is fixed.")
|
||||
}
|
||||
|
||||
if !authz.Allowed(platform.Permission{
|
||||
Action: platform.WriteAction,
|
||||
Resource: platform.Resource{
|
||||
Type: platform.BucketsResourceType,
|
||||
OrgID: &o.ID,
|
||||
ID: &bDst.ID,
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("expected authorization to be allowed write access to destination bucket, but it wasn't allowed")
|
||||
}
|
||||
|
||||
if !authz.Allowed(platform.Permission{
|
||||
Action: platform.ReadAction,
|
||||
Resource: platform.Resource{
|
||||
Type: platform.TasksResourceType,
|
||||
OrgID: &o.ID,
|
||||
ID: &taskID,
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("expected authorization to be allowed to read its task, but it wasn't allowed")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue