feat(task): impersonate user on task execution (#14675)

* feat(task): impersonate user on task execution

Passing tokens to tasks is cumbersome and we needed a way to more easily create tasks. With this change we no longer need a token on task create. We take the user that created the task and pass that in as the "owner". As far as the task is concerned the owner is the source of permissions.

This is done by adding an additional field on task create that is OwnerID. We will no longer respect the token passed in and it will be deprecated soon.

Things to do still:
Task updates need to allow for owners to be set.
pull/14684/head
Lyon Hill 2019-08-15 18:31:52 -06:00 committed by GitHub
parent 53f6806df1
commit a8d7870689
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 182 additions and 155 deletions

View File

@ -118,9 +118,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
if t.Token == "" { // TODO: add owner check
return nil, influxdb.ErrMissingToken // if t.Token == "" {
} // return nil, influxdb.ErrMissingToken
// }
if t.Type == influxdb.TaskTypeWildcard { if t.Type == influxdb.TaskTypeWildcard {
return nil, influxdb.ErrInvalidTaskType return nil, influxdb.ErrInvalidTaskType

View File

@ -438,6 +438,13 @@ func decodePostTaskRequest(ctx context.Context, r *http.Request) (*postTaskReque
return nil, err return nil, err
} }
// pull auth from ctx, populate OwnerID
auth, err := pcontext.GetAuthorizer(ctx)
if err != nil {
return nil, err
}
tc.OwnerID = auth.GetUserID()
if err := tc.Validate(); err != nil { if err := tc.Validate(); err != nil {
return nil, err return nil, err
} }
@ -1353,10 +1360,6 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p
span, _ := tracing.StartSpanFromContext(ctx) span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
if tc.Token == "" {
return nil, influxdb.ErrMissingToken
}
u, err := NewURL(t.Addr, tasksPath) u, err := NewURL(t.Addr, tasksPath)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -89,6 +89,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
Name: "task1", Name: "task1",
Description: "A little Task", Description: "A little Task",
OrganizationID: 1, OrganizationID: 1,
OwnerID: 1,
Organization: "test", Organization: "test",
AuthorizationID: 0x100, AuthorizationID: 0x100,
}, },
@ -96,6 +97,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
ID: 2, ID: 2,
Name: "task2", Name: "task2",
OrganizationID: 2, OrganizationID: 2,
OwnerID: 2,
Organization: "test", Organization: "test",
AuthorizationID: 0x200, AuthorizationID: 0x200,
}, },
@ -149,9 +151,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
} }
], ],
"orgID": "0000000000000001", "orgID": "0000000000000001",
"ownerID": "0000000000000001",
"org": "test", "org": "test",
"status": "", "status": "",
"authorizationID": "0000000000000100",
"flux": "" "flux": ""
}, },
{ {
@ -175,9 +177,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
} }
], ],
"orgID": "0000000000000002", "orgID": "0000000000000002",
"ownerID": "0000000000000002",
"org": "test", "org": "test",
"status": "", "status": "",
"authorizationID": "0000000000000200",
"flux": "" "flux": ""
} }
] ]
@ -195,6 +197,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
ID: 2, ID: 2,
Name: "task2", Name: "task2",
OrganizationID: 2, OrganizationID: 2,
OwnerID: 2,
Organization: "test", Organization: "test",
AuthorizationID: 0x200, AuthorizationID: 0x200,
}, },
@ -247,10 +250,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
} }
} }
], ],
"orgID": "0000000000000002", "orgID": "0000000000000002",
"ownerID": "0000000000000002",
"org": "test", "org": "test",
"status": "", "status": "",
"authorizationID": "0000000000000200",
"flux": "" "flux": ""
} }
] ]
@ -268,6 +271,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
ID: 2, ID: 2,
Name: "task2", Name: "task2",
OrganizationID: 2, OrganizationID: 2,
OwnerID: 2,
Organization: "test2", Organization: "test2",
AuthorizationID: 0x200, AuthorizationID: 0x200,
}, },
@ -320,9 +324,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
} }
], ],
"orgID": "0000000000000002", "orgID": "0000000000000002",
"ownerID": "0000000000000002",
"org": "test2", "org": "test2",
"status": "", "status": "",
"authorizationID": "0000000000000200",
"flux": "" "flux": ""
} }
] ]
@ -340,6 +344,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
ID: 1, ID: 1,
Name: "task1", Name: "task1",
OrganizationID: 1, OrganizationID: 1,
OwnerID: 1,
Organization: "test2", Organization: "test2",
AuthorizationID: 0x100, AuthorizationID: 0x100,
}, },
@ -347,6 +352,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) {
ID: 2, ID: 2,
Name: "task2", Name: "task2",
OrganizationID: 2, OrganizationID: 2,
OwnerID: 2,
Organization: "test2", Organization: "test2",
AuthorizationID: 0x200, AuthorizationID: 0x200,
}, },
@ -450,13 +456,13 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
taskService: &mock.TaskService{ taskService: &mock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) { CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
return &platform.Task{ return &platform.Task{
ID: 1, ID: 1,
Name: "task1", Name: "task1",
Description: "Brand New Task", Description: "Brand New Task",
OrganizationID: 1, OrganizationID: 1,
Organization: "test", OwnerID: 1,
AuthorizationID: 0x100, Organization: "test",
Flux: "abc", Flux: "abc",
}, nil }, nil
}, },
}, },
@ -479,9 +485,9 @@ func TestTaskHandler_handlePostTasks(t *testing.T) {
"description": "Brand New Task", "description": "Brand New Task",
"labels": [], "labels": [],
"orgID": "0000000000000001", "orgID": "0000000000000001",
"ownerID": "0000000000000001",
"org": "test", "org": "test",
"status": "", "status": "",
"authorizationID": "0000000000000100",
"flux": "abc" "flux": "abc"
} }
`, `,
@ -1239,11 +1245,8 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) {
if tc.OrganizationID != o.ID { if tc.OrganizationID != o.ID {
t.Fatalf("expected task to be created with org ID %s, got %s", o.ID, tc.OrganizationID) t.Fatalf("expected task to be created with org ID %s, got %s", o.ID, tc.OrganizationID)
} }
if tc.Token != authz.Token {
t.Fatalf("expected task to be created with previous token %s, got %s", authz.Token, tc.Token)
}
return &platform.Task{ID: 9, OrganizationID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil return &platform.Task{ID: 9, OrganizationID: o.ID, OwnerID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil
}, },
} }

View File

@ -92,8 +92,18 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb
} }
} }
ps, err := s.maxPermissions(ctx, tx, sn.UserID)
if err != nil {
return nil, err
}
sn.Permissions = ps
return sn, nil
}
func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) {
// TODO(desa): these values should be cached so it's not so expensive to lookup each time. // TODO(desa): these values should be cached so it's not so expensive to lookup each time.
f := influxdb.UserResourceMappingFilter{UserID: sn.UserID} f := influxdb.UserResourceMappingFilter{UserID: userID}
mappings, err := s.findUserResourceMappings(ctx, tx, f) mappings, err := s.findUserResourceMappings(ctx, tx, f)
if err != nil { if err != nil {
return nil, &influxdb.Error{ return nil, &influxdb.Error{
@ -112,11 +122,11 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb
ps = append(ps, p...) ps = append(ps, p...)
} }
ps = append(ps, influxdb.MePermissions(sn.UserID)...) ps = append(ps, influxdb.MePermissions(userID)...)
// TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere // TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere
// we did this so that the oper token would be used in a users permissions. // we did this so that the oper token would be used in a users permissions.
af := influxdb.AuthorizationFilter{UserID: &sn.UserID} af := influxdb.AuthorizationFilter{UserID: &userID}
as, err := s.findAuthorizations(ctx, tx, af) as, err := s.findAuthorizations(ctx, tx, af)
if err != nil { if err != nil {
return nil, err return nil, err
@ -125,8 +135,7 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb
ps = append(ps, a.Permissions...) ps = append(ps, a.Permissions...)
} }
sn.Permissions = ps return ps, nil
return sn, nil
} }
// PutSession puts the session at key. // PutSession puts the session at key.

View File

@ -112,6 +112,34 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf
t.LatestCompleted = t.CreatedAt t.LatestCompleted = t.CreatedAt
} }
if !t.OwnerID.Valid() {
authType := struct {
AuthorizationID influxdb.ID `json:"authorizationID"`
}{}
if err := json.Unmarshal(v, &authType); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
auth, err := s.findAuthorizationByID(ctx, tx, authType.AuthorizationID)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
t.OwnerID = auth.GetUserID()
}
// populate task Auth
ps, err := s.maxPermissions(ctx, tx, t.OwnerID)
if err != nil {
return nil, err
}
t.Authorization = &influxdb.Authorization{
Status: influxdb.Active,
ID: influxdb.ID(1),
OrgID: t.OrganizationID,
Permissions: ps,
}
return t, nil return t, nil
} }
@ -470,15 +498,7 @@ func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*infl
} }
func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) { func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) {
if tc.Token == "" { var err error
return nil, influxdb.ErrMissingToken
}
auth, err := s.findAuthorizationByToken(ctx, tx, tc.Token)
if err != nil {
return nil, err
}
var org *influxdb.Organization var org *influxdb.Organization
if tc.OrganizationID.Valid() { if tc.OrganizationID.Valid() {
org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID) org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID)
@ -510,7 +530,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
Type: tc.Type, Type: tc.Type,
OrganizationID: org.ID, OrganizationID: org.ID,
Organization: org.Name, Organization: org.Name,
AuthorizationID: auth.Identifier(), OwnerID: tc.OwnerID,
Name: opt.Name, Name: opt.Name,
Description: tc.Description, Description: tc.Description,
Status: tc.Status, Status: tc.Status,
@ -565,6 +585,15 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
s.Logger.Info("error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) s.Logger.Info("error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err))
} }
// populate permissions so the task can be used immediately
// if we cant populate here we shouldn't error.
ps, _ := s.maxPermissions(ctx, tx, task.OwnerID)
task.Authorization = &influxdb.Authorization{
Status: influxdb.Active,
ID: influxdb.ID(1),
OrgID: task.OrganizationID,
Permissions: ps,
}
return task, nil return task, nil
} }

View File

@ -120,7 +120,7 @@ func TestNextRunDue(t *testing.T) {
task, err := service.CreateTask(ctx, influxdb.TaskCreate{ task, err := service.CreateTask(ctx, influxdb.TaskCreate{
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`, Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: o.ID, OrganizationID: o.ID,
Token: authz.Token, OwnerID: u.ID,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

35
task.go
View File

@ -26,21 +26,23 @@ const (
// Task is a task. 🎊 // Task is a task. 🎊
type Task struct { type Task struct {
ID ID `json:"id"` ID ID `json:"id"`
Type string `json:"type,omitempty"` Type string `json:"type,omitempty"`
OrganizationID ID `json:"orgID"` OrganizationID ID `json:"orgID"`
Organization string `json:"org"` Organization string `json:"org"`
AuthorizationID ID `json:"authorizationID"` AuthorizationID ID `json:"-"`
Name string `json:"name"` Authorization *Authorization `json:"-"`
Description string `json:"description,omitempty"` OwnerID ID `json:"ownerID"`
Status string `json:"status"` Name string `json:"name"`
Flux string `json:"flux"` Description string `json:"description,omitempty"`
Every string `json:"every,omitempty"` Status string `json:"status"`
Cron string `json:"cron,omitempty"` Flux string `json:"flux"`
Offset string `json:"offset,omitempty"` Every string `json:"every,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"` Cron string `json:"cron,omitempty"`
CreatedAt string `json:"createdAt,omitempty"` Offset string `json:"offset,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"` LatestCompleted string `json:"latestCompleted,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
} }
// EffectiveCron returns the effective cron string of the options. // EffectiveCron returns the effective cron string of the options.
@ -143,7 +145,8 @@ type TaskCreate struct {
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
OrganizationID ID `json:"orgID,omitempty"` OrganizationID ID `json:"orgID,omitempty"`
Organization string `json:"org,omitempty"` Organization string `json:"org,omitempty"`
Token string `json:"token,omitempty"` Token string `json:"-"`
OwnerID ID `json:"-"`
} }
func (t TaskCreate) Validate() error { func (t TaskCreate) Validate() error {

View File

@ -189,8 +189,9 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
// so we are faking a read only permission to the org's system bucket // so we are faking a read only permission to the org's system bucket
runSystemBucketID := taskSystemBucketID runSystemBucketID := taskSystemBucketID
runAuth := &influxdb.Authorization{ runAuth := &influxdb.Authorization{
ID: taskSystemBucketID, Status: influxdb.Active,
OrgID: task.OrganizationID, ID: taskSystemBucketID,
OrgID: task.OrganizationID,
Permissions: []influxdb.Permission{ Permissions: []influxdb.Permission{
influxdb.Permission{ influxdb.Permission{
Action: influxdb.ReadAction, Action: influxdb.ReadAction,

View File

@ -56,13 +56,8 @@ func (e *queryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRu
return nil, err return nil, err
} }
auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID))
if err != nil {
return nil, err
}
// TODO(goller): remove need for context authorization. // TODO(goller): remove need for context authorization.
return newSyncRunPromise(icontext.SetAuthorizer(ctx, auth), auth, run, e, t), nil return newSyncRunPromise(icontext.SetAuthorizer(ctx, t.Authorization), t.Authorization, run, e, t), nil
} }
func (e *queryServiceExecutor) Wait() { func (e *queryServiceExecutor) Wait() {
@ -233,18 +228,13 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que
return nil, err return nil, err
} }
auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID))
if err != nil {
return nil, err
}
pkg, err := flux.Parse(t.Flux) pkg, err := flux.Parse(t.Flux)
if err != nil { if err != nil {
return nil, err return nil, err
} }
req := &query.Request{ req := &query.Request{
Authorization: auth, Authorization: t.Authorization,
OrganizationID: t.OrganizationID, OrganizationID: t.OrganizationID,
Compiler: lang.ASTCompiler{ Compiler: lang.ASTCompiler{
AST: pkg, AST: pkg,
@ -252,7 +242,7 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que
}, },
} }
// Only set the authorizer on the context where we need it here. // Only set the authorizer on the context where we need it here.
q, err := e.qs.Query(icontext.SetAuthorizer(ctx, auth), req) q, err := e.qs.Query(icontext.SetAuthorizer(ctx, t.Authorization), req)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -335,7 +335,7 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -379,15 +379,6 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) {
if !reflect.DeepEqual(res, res2) { if !reflect.DeepEqual(res, res2) {
t.Fatalf("second call to wait returned a different result: %#v", res2) t.Fatalf("second call to wait returned a different result: %#v", res2)
} }
// The query must have received the appropriate authorizer.
qa, err := icontext.GetAuthorizer(sys.svc.mostRecentCtx)
if err != nil {
t.Fatal(err)
}
if qa.Identifier() != tc.Auth.ID {
t.Fatalf("expected query authorizer to have ID %v, got %v", tc.Auth.ID, qa.Identifier())
}
}) })
} }
@ -398,7 +389,7 @@ func testExecutorQueryFailure(t *testing.T, fn createSysFn) {
t.Parallel() t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -428,7 +419,7 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) {
t.Parallel() t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -457,7 +448,7 @@ func testExecutorServiceError(t *testing.T, fn createSysFn) {
t.Parallel() t.Parallel()
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -525,7 +516,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx = icontext.SetAuthorizer(ctx, tc.Auth) ctx = icontext.SetAuthorizer(ctx, tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -565,7 +556,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -606,7 +597,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -646,7 +637,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth)
task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -213,17 +213,12 @@ func (e *TaskExecutor) createPromise(ctx context.Context, run *influxdb.Run) (*P
return nil, err return nil, err
} }
auth, err := e.as.FindAuthorizationByID(ctx, t.AuthorizationID)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// create promise // create promise
p := &Promise{ p := &Promise{
run: run, run: run,
task: t, task: t,
auth: auth, auth: t.Authorization,
done: make(chan struct{}), done: make(chan struct{}),
ctx: ctx, ctx: ctx,
cancelFunc: cancel, cancelFunc: cancel,

View File

@ -64,7 +64,7 @@ func testQuerySuccess(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -100,7 +100,7 @@ func testQueryFailure(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -136,7 +136,7 @@ func testManualRun(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -184,7 +184,7 @@ func testResumingRun(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -233,7 +233,7 @@ func testWorkerLimit(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -263,7 +263,7 @@ func testLimitFunc(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -310,7 +310,7 @@ func testMetrics(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -357,7 +357,7 @@ func testMetrics(t *testing.T) {
} }
// manual runs metrics // manual runs metrics
mt, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) mt, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -395,7 +395,7 @@ func testIteratorFailure(t *testing.T) {
script := fmt.Sprintf(fmtTestScript, t.Name()) script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -160,13 +160,12 @@ type System struct {
func testTaskCRUD(t *testing.T, sys *System) { func testTaskCRUD(t *testing.T, sys *System) {
cr := creds(t, sys) cr := creds(t, sys)
authzID := cr.AuthorizationID
// Create a task. // Create a task.
tc := influxdb.TaskCreate{ tc := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
@ -188,17 +187,18 @@ func testTaskCRUD(t *testing.T, sys *System) {
return nil, fmt.Errorf("failed to find task by id %s", id) return nil, fmt.Errorf("failed to find task by id %s", id)
} }
// should not be able to create a task without a token // TODO: replace with ErrMissingOwner test
noToken := influxdb.TaskCreate{ // // should not be able to create a task without a token
OrganizationID: cr.OrgID, // noToken := influxdb.TaskCreate{
Flux: fmt.Sprintf(scriptFmt, 0), // OrganizationID: cr.OrgID,
// Token: cr.Token, // should fail // Flux: fmt.Sprintf(scriptFmt, 0),
} // // OwnerID: cr.UserID, // should fail
_, err = sys.TaskService.CreateTask(authorizedCtx, noToken) // }
// _, err = sys.TaskService.CreateTask(authorizedCtx, noToken)
if err != influxdb.ErrMissingToken { // if err != influxdb.ErrMissingToken {
t.Fatalf("expected error missing token, got: %v", err) // t.Fatalf("expected error missing token, got: %v", err)
} // }
// Look up a task the different ways we can. // Look up a task the different ways we can.
// Map of method name to found task. // Map of method name to found task.
@ -239,7 +239,9 @@ func testTaskCRUD(t *testing.T, sys *System) {
LatestCompleted: tsk.LatestCompleted, LatestCompleted: tsk.LatestCompleted,
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Organization: cr.Org, Organization: cr.Org,
AuthorizationID: authzID, AuthorizationID: tsk.AuthorizationID,
Authorization: tsk.Authorization,
OwnerID: tsk.OwnerID,
Name: "task #0", Name: "task #0",
Cron: "* * * * *", Cron: "* * * * *",
Offset: "5s", Offset: "5s",
@ -257,7 +259,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
tc2 := influxdb.TaskCreate{ tc2 := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 1), Flux: fmt.Sprintf(scriptFmt, 1),
Token: cr.Token, OwnerID: cr.UserID,
} }
if _, err := sys.TaskService.CreateTask(authorizedCtx, tc2); err != nil { if _, err := sys.TaskService.CreateTask(authorizedCtx, tc2); err != nil {
@ -381,32 +383,32 @@ func testTaskCRUD(t *testing.T, sys *System) {
t.Fatalf("expected task status to be active, got %q", f.Status) t.Fatalf("expected task status to be active, got %q", f.Status)
} }
// Update task: use a new token on the context and modify some other option. // // Update task: use a new token on the context and modify some other option.
// Ensure the authorization doesn't change -- it did change at one time, which was bug https://github.com/influxdata/influxdb/issues/12218. // // Ensure the authorization doesn't change -- it did change at one time, which was bug https://github.com/influxdata/influxdb/issues/12218.
newAuthz := &influxdb.Authorization{OrgID: cr.OrgID, UserID: cr.UserID, Permissions: influxdb.OperPermissions()} // newAuthz := &influxdb.Authorization{OrgID: cr.OrgID, UserID: cr.UserID, Permissions: influxdb.OperPermissions()}
if err := sys.I.CreateAuthorization(sys.Ctx, newAuthz); err != nil { // if err := sys.I.CreateAuthorization(sys.Ctx, newAuthz); err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
newAuthorizedCtx := icontext.SetAuthorizer(sys.Ctx, newAuthz) // newAuthorizedCtx := icontext.SetAuthorizer(sys.Ctx, newAuthz)
f, err = sys.TaskService.UpdateTask(newAuthorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "foo"}}) // f, err = sys.TaskService.UpdateTask(newAuthorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "foo"}})
if err != nil { // if err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
if f.Name != "foo" { // if f.Name != "foo" {
t.Fatalf("expected name to update to foo, got %s", f.Name) // t.Fatalf("expected name to update to foo, got %s", f.Name)
} // }
if f.AuthorizationID != authzID { // if f.AuthorizationID != authzID {
t.Fatalf("expected authorization ID to remain %v, got %v", authzID, f.AuthorizationID) // t.Fatalf("expected authorization ID to remain %v, got %v", authzID, f.AuthorizationID)
} // }
// Now actually update to use the new token, from the original authorization context. // // Now actually update to use the new token, from the original authorization context.
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Token: newAuthz.Token}) // f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Token: newAuthz.Token})
if err != nil { // if err != nil {
t.Fatal(err) // t.Fatal(err)
} // }
if f.AuthorizationID != newAuthz.ID { // if f.AuthorizationID != newAuthz.ID {
t.Fatalf("expected authorization ID %v, got %v", newAuthz.ID, f.AuthorizationID) // t.Fatalf("expected authorization ID %v, got %v", newAuthz.ID, f.AuthorizationID)
} // }
// Delete task. // Delete task.
if err := sys.TaskService.DeleteTask(sys.Ctx, origID); err != nil { if err := sys.TaskService.DeleteTask(sys.Ctx, origID); err != nil {
@ -441,7 +443,7 @@ from(bucket: "b")
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: script, Flux: script,
Token: cr.Token, OwnerID: cr.UserID,
} }
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
task, err := sys.TaskService.CreateTask(authorizedCtx, ct) task, err := sys.TaskService.CreateTask(authorizedCtx, ct)
@ -505,7 +507,7 @@ func testUpdate(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
task, err := sys.TaskService.CreateTask(authorizedCtx, ct) task, err := sys.TaskService.CreateTask(authorizedCtx, ct)
@ -609,7 +611,7 @@ func testTaskRuns(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -724,7 +726,7 @@ func testTaskRuns(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -754,7 +756,7 @@ func testTaskRuns(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -990,7 +992,7 @@ func testTaskConcurrency(t *testing.T, sys *System) {
createTaskCh <- influxdb.TaskCreate{ createTaskCh <- influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, i), Flux: fmt.Sprintf(scriptFmt, i),
Token: cr.Token, OwnerID: cr.UserID,
} }
} }
@ -1007,7 +1009,7 @@ func testManualRun(t *testing.T, s *System) {
tc := influxdb.TaskCreate{ tc := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
authorizedCtx := icontext.SetAuthorizer(s.Ctx, cr.Authorizer()) authorizedCtx := icontext.SetAuthorizer(s.Ctx, cr.Authorizer())
@ -1051,7 +1053,7 @@ func testRunStorage(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -1229,7 +1231,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -1299,7 +1301,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) {
ct := influxdb.TaskCreate{ ct := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct)
if err != nil { if err != nil {
@ -1481,7 +1483,7 @@ func testTaskType(t *testing.T, sys *System) {
ts := influxdb.TaskCreate{ ts := influxdb.TaskCreate{
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts) tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts)
@ -1496,7 +1498,7 @@ func testTaskType(t *testing.T, sys *System) {
Type: "cows", Type: "cows",
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc) tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc)
@ -1511,7 +1513,7 @@ func testTaskType(t *testing.T, sys *System) {
Type: "pigs", Type: "pigs",
OrganizationID: cr.OrgID, OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0), Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token, OwnerID: cr.UserID,
} }
tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp) tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp)