diff --git a/authorizer/task.go b/authorizer/task.go index 150677828f..50988a54a3 100644 --- a/authorizer/task.go +++ b/authorizer/task.go @@ -118,9 +118,10 @@ func (ts *taskServiceValidator) CreateTask(ctx context.Context, t platform.TaskC span, ctx := tracing.StartSpanFromContext(ctx) defer span.Finish() - if t.Token == "" { - return nil, influxdb.ErrMissingToken - } + // TODO: add owner check + // if t.Token == "" { + // return nil, influxdb.ErrMissingToken + // } if t.Type == influxdb.TaskTypeWildcard { return nil, influxdb.ErrInvalidTaskType diff --git a/http/task_service.go b/http/task_service.go index 30833a0e22..211c299d8c 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -438,6 +438,13 @@ func decodePostTaskRequest(ctx context.Context, r *http.Request) (*postTaskReque return nil, err } + // pull auth from ctx, populate OwnerID + auth, err := pcontext.GetAuthorizer(ctx) + if err != nil { + return nil, err + } + tc.OwnerID = auth.GetUserID() + if err := tc.Validate(); err != nil { return nil, err } @@ -1353,10 +1360,6 @@ func (t TaskService) CreateTask(ctx context.Context, tc platform.TaskCreate) (*p span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() - if tc.Token == "" { - return nil, influxdb.ErrMissingToken - } - u, err := NewURL(t.Addr, tasksPath) if err != nil { return nil, err diff --git a/http/task_service_test.go b/http/task_service_test.go index e1ec86f399..f5b9e44384 100644 --- a/http/task_service_test.go +++ b/http/task_service_test.go @@ -89,6 +89,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { Name: "task1", Description: "A little Task", OrganizationID: 1, + OwnerID: 1, Organization: "test", AuthorizationID: 0x100, }, @@ -96,6 +97,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { ID: 2, Name: "task2", OrganizationID: 2, + OwnerID: 2, Organization: "test", AuthorizationID: 0x200, }, @@ -149,9 +151,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { } ], "orgID": "0000000000000001", + "ownerID": "0000000000000001", "org": "test", "status": "", - "authorizationID": "0000000000000100", "flux": "" }, { @@ -175,9 +177,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { } ], "orgID": "0000000000000002", + "ownerID": "0000000000000002", "org": "test", "status": "", - "authorizationID": "0000000000000200", "flux": "" } ] @@ -195,6 +197,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { ID: 2, Name: "task2", OrganizationID: 2, + OwnerID: 2, Organization: "test", AuthorizationID: 0x200, }, @@ -247,10 +250,10 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { } } ], - "orgID": "0000000000000002", + "orgID": "0000000000000002", + "ownerID": "0000000000000002", "org": "test", "status": "", - "authorizationID": "0000000000000200", "flux": "" } ] @@ -268,6 +271,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { ID: 2, Name: "task2", OrganizationID: 2, + OwnerID: 2, Organization: "test2", AuthorizationID: 0x200, }, @@ -320,9 +324,9 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { } ], "orgID": "0000000000000002", + "ownerID": "0000000000000002", "org": "test2", "status": "", - "authorizationID": "0000000000000200", "flux": "" } ] @@ -340,6 +344,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { ID: 1, Name: "task1", OrganizationID: 1, + OwnerID: 1, Organization: "test2", AuthorizationID: 0x100, }, @@ -347,6 +352,7 @@ func TestTaskHandler_handleGetTasks(t *testing.T) { ID: 2, Name: "task2", OrganizationID: 2, + OwnerID: 2, Organization: "test2", AuthorizationID: 0x200, }, @@ -450,13 +456,13 @@ func TestTaskHandler_handlePostTasks(t *testing.T) { taskService: &mock.TaskService{ CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) { return &platform.Task{ - ID: 1, - Name: "task1", - Description: "Brand New Task", - OrganizationID: 1, - Organization: "test", - AuthorizationID: 0x100, - Flux: "abc", + ID: 1, + Name: "task1", + Description: "Brand New Task", + OrganizationID: 1, + OwnerID: 1, + Organization: "test", + Flux: "abc", }, nil }, }, @@ -479,9 +485,9 @@ func TestTaskHandler_handlePostTasks(t *testing.T) { "description": "Brand New Task", "labels": [], "orgID": "0000000000000001", + "ownerID": "0000000000000001", "org": "test", "status": "", - "authorizationID": "0000000000000100", "flux": "abc" } `, @@ -1239,11 +1245,8 @@ func TestTaskHandler_CreateTaskWithOrgName(t *testing.T) { if tc.OrganizationID != o.ID { t.Fatalf("expected task to be created with org ID %s, got %s", o.ID, tc.OrganizationID) } - if tc.Token != authz.Token { - t.Fatalf("expected task to be created with previous token %s, got %s", authz.Token, tc.Token) - } - return &platform.Task{ID: 9, OrganizationID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil + return &platform.Task{ID: 9, OrganizationID: o.ID, OwnerID: o.ID, AuthorizationID: authz.ID, Name: "x", Flux: tc.Flux}, nil }, } diff --git a/kv/session.go b/kv/session.go index c46da8181f..128cddf3e5 100644 --- a/kv/session.go +++ b/kv/session.go @@ -92,8 +92,18 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb } } + ps, err := s.maxPermissions(ctx, tx, sn.UserID) + if err != nil { + return nil, err + } + + sn.Permissions = ps + return sn, nil +} + +func (s *Service) maxPermissions(ctx context.Context, tx Tx, userID influxdb.ID) ([]influxdb.Permission, error) { // TODO(desa): these values should be cached so it's not so expensive to lookup each time. - f := influxdb.UserResourceMappingFilter{UserID: sn.UserID} + f := influxdb.UserResourceMappingFilter{UserID: userID} mappings, err := s.findUserResourceMappings(ctx, tx, f) if err != nil { return nil, &influxdb.Error{ @@ -112,11 +122,11 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb ps = append(ps, p...) } - ps = append(ps, influxdb.MePermissions(sn.UserID)...) + ps = append(ps, influxdb.MePermissions(userID)...) // TODO(desa): this is super expensive, we should keep a list of a users maximal privileges somewhere // we did this so that the oper token would be used in a users permissions. - af := influxdb.AuthorizationFilter{UserID: &sn.UserID} + af := influxdb.AuthorizationFilter{UserID: &userID} as, err := s.findAuthorizations(ctx, tx, af) if err != nil { return nil, err @@ -125,8 +135,7 @@ func (s *Service) findSession(ctx context.Context, tx Tx, key string) (*influxdb ps = append(ps, a.Permissions...) } - sn.Permissions = ps - return sn, nil + return ps, nil } // PutSession puts the session at key. diff --git a/kv/task.go b/kv/task.go index 107c262e3a..a42da910d6 100644 --- a/kv/task.go +++ b/kv/task.go @@ -112,6 +112,34 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf t.LatestCompleted = t.CreatedAt } + if !t.OwnerID.Valid() { + authType := struct { + AuthorizationID influxdb.ID `json:"authorizationID"` + }{} + if err := json.Unmarshal(v, &authType); err != nil { + return nil, influxdb.ErrInternalTaskServiceError(err) + } + + auth, err := s.findAuthorizationByID(ctx, tx, authType.AuthorizationID) + if err != nil { + return nil, influxdb.ErrInternalTaskServiceError(err) + } + + t.OwnerID = auth.GetUserID() + } + + // populate task Auth + ps, err := s.maxPermissions(ctx, tx, t.OwnerID) + if err != nil { + return nil, err + } + + t.Authorization = &influxdb.Authorization{ + Status: influxdb.Active, + ID: influxdb.ID(1), + OrgID: t.OrganizationID, + Permissions: ps, + } return t, nil } @@ -470,15 +498,7 @@ func (s *Service) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*infl } func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) (*influxdb.Task, error) { - if tc.Token == "" { - return nil, influxdb.ErrMissingToken - } - - auth, err := s.findAuthorizationByToken(ctx, tx, tc.Token) - if err != nil { - return nil, err - } - + var err error var org *influxdb.Organization if tc.OrganizationID.Valid() { org, err = s.findOrganizationByID(ctx, tx, tc.OrganizationID) @@ -510,7 +530,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) Type: tc.Type, OrganizationID: org.ID, Organization: org.Name, - AuthorizationID: auth.Identifier(), + OwnerID: tc.OwnerID, Name: opt.Name, Description: tc.Description, Status: tc.Status, @@ -565,6 +585,15 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) s.Logger.Info("error creating user resource mapping for task", zap.Stringer("taskID", task.ID), zap.Error(err)) } + // populate permissions so the task can be used immediately + // if we cant populate here we shouldn't error. + ps, _ := s.maxPermissions(ctx, tx, task.OwnerID) + task.Authorization = &influxdb.Authorization{ + Status: influxdb.Active, + ID: influxdb.ID(1), + OrgID: task.OrganizationID, + Permissions: ps, + } return task, nil } diff --git a/kv/task_test.go b/kv/task_test.go index 4e816827c5..e9ea172e6f 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -120,7 +120,7 @@ func TestNextRunDue(t *testing.T) { task, err := service.CreateTask(ctx, influxdb.TaskCreate{ Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`, OrganizationID: o.ID, - Token: authz.Token, + OwnerID: u.ID, }) if err != nil { t.Fatal(err) diff --git a/task.go b/task.go index 935925cd21..d633f93b1a 100644 --- a/task.go +++ b/task.go @@ -26,21 +26,23 @@ const ( // Task is a task. 🎊 type Task struct { - ID ID `json:"id"` - Type string `json:"type,omitempty"` - OrganizationID ID `json:"orgID"` - Organization string `json:"org"` - AuthorizationID ID `json:"authorizationID"` - Name string `json:"name"` - Description string `json:"description,omitempty"` - Status string `json:"status"` - Flux string `json:"flux"` - Every string `json:"every,omitempty"` - Cron string `json:"cron,omitempty"` - Offset string `json:"offset,omitempty"` - LatestCompleted string `json:"latestCompleted,omitempty"` - CreatedAt string `json:"createdAt,omitempty"` - UpdatedAt string `json:"updatedAt,omitempty"` + ID ID `json:"id"` + Type string `json:"type,omitempty"` + OrganizationID ID `json:"orgID"` + Organization string `json:"org"` + AuthorizationID ID `json:"-"` + Authorization *Authorization `json:"-"` + OwnerID ID `json:"ownerID"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Status string `json:"status"` + Flux string `json:"flux"` + Every string `json:"every,omitempty"` + Cron string `json:"cron,omitempty"` + Offset string `json:"offset,omitempty"` + LatestCompleted string `json:"latestCompleted,omitempty"` + CreatedAt string `json:"createdAt,omitempty"` + UpdatedAt string `json:"updatedAt,omitempty"` } // EffectiveCron returns the effective cron string of the options. @@ -143,7 +145,8 @@ type TaskCreate struct { Status string `json:"status,omitempty"` OrganizationID ID `json:"orgID,omitempty"` Organization string `json:"org,omitempty"` - Token string `json:"token,omitempty"` + Token string `json:"-"` + OwnerID ID `json:"-"` } func (t TaskCreate) Validate() error { diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index 91a5b834c5..a27e289afc 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -189,8 +189,9 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi // so we are faking a read only permission to the org's system bucket runSystemBucketID := taskSystemBucketID runAuth := &influxdb.Authorization{ - ID: taskSystemBucketID, - OrgID: task.OrganizationID, + Status: influxdb.Active, + ID: taskSystemBucketID, + OrgID: task.OrganizationID, Permissions: []influxdb.Permission{ influxdb.Permission{ Action: influxdb.ReadAction, diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 47448741e1..73a383c56f 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -56,13 +56,8 @@ func (e *queryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRu return nil, err } - auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID)) - if err != nil { - return nil, err - } - // TODO(goller): remove need for context authorization. - return newSyncRunPromise(icontext.SetAuthorizer(ctx, auth), auth, run, e, t), nil + return newSyncRunPromise(icontext.SetAuthorizer(ctx, t.Authorization), t.Authorization, run, e, t), nil } func (e *queryServiceExecutor) Wait() { @@ -233,18 +228,13 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que return nil, err } - auth, err := e.as.FindAuthorizationByID(ctx, influxdb.ID(t.AuthorizationID)) - if err != nil { - return nil, err - } - pkg, err := flux.Parse(t.Flux) if err != nil { return nil, err } req := &query.Request{ - Authorization: auth, + Authorization: t.Authorization, OrganizationID: t.OrganizationID, Compiler: lang.ASTCompiler{ AST: pkg, @@ -252,7 +242,7 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que }, } // Only set the authorizer on the context where we need it here. - q, err := e.qs.Query(icontext.SetAuthorizer(ctx, auth), req) + q, err := e.qs.Query(icontext.SetAuthorizer(ctx, t.Authorization), req) if err != nil { return nil, err } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 33de082aed..7db7334988 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -335,7 +335,7 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -379,15 +379,6 @@ func testExecutorQuerySuccess(t *testing.T, fn createSysFn) { if !reflect.DeepEqual(res, res2) { t.Fatalf("second call to wait returned a different result: %#v", res2) } - - // The query must have received the appropriate authorizer. - qa, err := icontext.GetAuthorizer(sys.svc.mostRecentCtx) - if err != nil { - t.Fatal(err) - } - if qa.Identifier() != tc.Auth.ID { - t.Fatalf("expected query authorizer to have ID %v, got %v", tc.Auth.ID, qa.Identifier()) - } }) } @@ -398,7 +389,7 @@ func testExecutorQueryFailure(t *testing.T, fn createSysFn) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -428,7 +419,7 @@ func testExecutorPromiseCancel(t *testing.T, fn createSysFn) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -457,7 +448,7 @@ func testExecutorServiceError(t *testing.T, fn createSysFn) { t.Parallel() script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -525,7 +516,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx = icontext.SetAuthorizer(ctx, tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -565,7 +556,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) script := fmt.Sprintf(fmtTestScript, t.Name()) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -606,7 +597,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) script := fmt.Sprintf(fmtTestScript, t.Name()) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -646,7 +637,7 @@ func testExecutorWait(t *testing.T, createSys createSysFn) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tc.Auth) - task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, Token: tc.Auth.Token, Flux: script}) + task, err := sys.ts.CreateTask(ctx, platform.TaskCreate{OrganizationID: tc.OrgID, OwnerID: tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index 2025bd069f..5d0e89103d 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -213,17 +213,12 @@ func (e *TaskExecutor) createPromise(ctx context.Context, run *influxdb.Run) (*P return nil, err } - auth, err := e.as.FindAuthorizationByID(ctx, t.AuthorizationID) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(ctx) // create promise p := &Promise{ run: run, task: t, - auth: auth, + auth: t.Authorization, done: make(chan struct{}), ctx: ctx, cancelFunc: cancel, diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index 4730da9b43..95ff968454 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -64,7 +64,7 @@ func testQuerySuccess(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -100,7 +100,7 @@ func testQueryFailure(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -136,7 +136,7 @@ func testManualRun(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -184,7 +184,7 @@ func testResumingRun(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -233,7 +233,7 @@ func testWorkerLimit(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -263,7 +263,7 @@ func testLimitFunc(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -310,7 +310,7 @@ func testMetrics(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -357,7 +357,7 @@ func testMetrics(t *testing.T) { } // manual runs metrics - mt, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + mt, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } @@ -395,7 +395,7 @@ func testIteratorFailure(t *testing.T) { script := fmt.Sprintf(fmtTestScript, t.Name()) ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth) - task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script}) + task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, OwnerID: tes.tc.Auth.GetUserID(), Flux: script}) if err != nil { t.Fatal(err) } diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 88e35f7405..576529b894 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -160,13 +160,12 @@ type System struct { func testTaskCRUD(t *testing.T, sys *System) { cr := creds(t, sys) - authzID := cr.AuthorizationID // Create a task. tc := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) @@ -188,17 +187,18 @@ func testTaskCRUD(t *testing.T, sys *System) { return nil, fmt.Errorf("failed to find task by id %s", id) } - // should not be able to create a task without a token - noToken := influxdb.TaskCreate{ - OrganizationID: cr.OrgID, - Flux: fmt.Sprintf(scriptFmt, 0), - // Token: cr.Token, // should fail - } - _, err = sys.TaskService.CreateTask(authorizedCtx, noToken) + // TODO: replace with ErrMissingOwner test + // // should not be able to create a task without a token + // noToken := influxdb.TaskCreate{ + // OrganizationID: cr.OrgID, + // Flux: fmt.Sprintf(scriptFmt, 0), + // // OwnerID: cr.UserID, // should fail + // } + // _, err = sys.TaskService.CreateTask(authorizedCtx, noToken) - if err != influxdb.ErrMissingToken { - t.Fatalf("expected error missing token, got: %v", err) - } + // if err != influxdb.ErrMissingToken { + // t.Fatalf("expected error missing token, got: %v", err) + // } // Look up a task the different ways we can. // Map of method name to found task. @@ -239,7 +239,9 @@ func testTaskCRUD(t *testing.T, sys *System) { LatestCompleted: tsk.LatestCompleted, OrganizationID: cr.OrgID, Organization: cr.Org, - AuthorizationID: authzID, + AuthorizationID: tsk.AuthorizationID, + Authorization: tsk.Authorization, + OwnerID: tsk.OwnerID, Name: "task #0", Cron: "* * * * *", Offset: "5s", @@ -257,7 +259,7 @@ func testTaskCRUD(t *testing.T, sys *System) { tc2 := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 1), - Token: cr.Token, + OwnerID: cr.UserID, } if _, err := sys.TaskService.CreateTask(authorizedCtx, tc2); err != nil { @@ -381,32 +383,32 @@ func testTaskCRUD(t *testing.T, sys *System) { t.Fatalf("expected task status to be active, got %q", f.Status) } - // Update task: use a new token on the context and modify some other option. - // Ensure the authorization doesn't change -- it did change at one time, which was bug https://github.com/influxdata/influxdb/issues/12218. - newAuthz := &influxdb.Authorization{OrgID: cr.OrgID, UserID: cr.UserID, Permissions: influxdb.OperPermissions()} - if err := sys.I.CreateAuthorization(sys.Ctx, newAuthz); err != nil { - t.Fatal(err) - } - newAuthorizedCtx := icontext.SetAuthorizer(sys.Ctx, newAuthz) - f, err = sys.TaskService.UpdateTask(newAuthorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "foo"}}) - if err != nil { - t.Fatal(err) - } - if f.Name != "foo" { - t.Fatalf("expected name to update to foo, got %s", f.Name) - } - if f.AuthorizationID != authzID { - t.Fatalf("expected authorization ID to remain %v, got %v", authzID, f.AuthorizationID) - } + // // Update task: use a new token on the context and modify some other option. + // // Ensure the authorization doesn't change -- it did change at one time, which was bug https://github.com/influxdata/influxdb/issues/12218. + // newAuthz := &influxdb.Authorization{OrgID: cr.OrgID, UserID: cr.UserID, Permissions: influxdb.OperPermissions()} + // if err := sys.I.CreateAuthorization(sys.Ctx, newAuthz); err != nil { + // t.Fatal(err) + // } + // newAuthorizedCtx := icontext.SetAuthorizer(sys.Ctx, newAuthz) + // f, err = sys.TaskService.UpdateTask(newAuthorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "foo"}}) + // if err != nil { + // t.Fatal(err) + // } + // if f.Name != "foo" { + // t.Fatalf("expected name to update to foo, got %s", f.Name) + // } + // if f.AuthorizationID != authzID { + // t.Fatalf("expected authorization ID to remain %v, got %v", authzID, f.AuthorizationID) + // } - // Now actually update to use the new token, from the original authorization context. - f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Token: newAuthz.Token}) - if err != nil { - t.Fatal(err) - } - if f.AuthorizationID != newAuthz.ID { - t.Fatalf("expected authorization ID %v, got %v", newAuthz.ID, f.AuthorizationID) - } + // // Now actually update to use the new token, from the original authorization context. + // f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Token: newAuthz.Token}) + // if err != nil { + // t.Fatal(err) + // } + // if f.AuthorizationID != newAuthz.ID { + // t.Fatalf("expected authorization ID %v, got %v", newAuthz.ID, f.AuthorizationID) + // } // Delete task. if err := sys.TaskService.DeleteTask(sys.Ctx, origID); err != nil { @@ -441,7 +443,7 @@ from(bucket: "b") ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: script, - Token: cr.Token, + OwnerID: cr.UserID, } authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) task, err := sys.TaskService.CreateTask(authorizedCtx, ct) @@ -505,7 +507,7 @@ func testUpdate(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()) task, err := sys.TaskService.CreateTask(authorizedCtx, ct) @@ -609,7 +611,7 @@ func testTaskRuns(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -724,7 +726,7 @@ func testTaskRuns(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -754,7 +756,7 @@ func testTaskRuns(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -990,7 +992,7 @@ func testTaskConcurrency(t *testing.T, sys *System) { createTaskCh <- influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, i), - Token: cr.Token, + OwnerID: cr.UserID, } } @@ -1007,7 +1009,7 @@ func testManualRun(t *testing.T, s *System) { tc := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } authorizedCtx := icontext.SetAuthorizer(s.Ctx, cr.Authorizer()) @@ -1051,7 +1053,7 @@ func testRunStorage(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -1229,7 +1231,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -1299,7 +1301,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { ct := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } task, err := sys.TaskService.CreateTask(icontext.SetAuthorizer(sys.Ctx, cr.Authorizer()), ct) if err != nil { @@ -1481,7 +1483,7 @@ func testTaskType(t *testing.T, sys *System) { ts := influxdb.TaskCreate{ OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } tsk, err := sys.TaskService.CreateTask(authorizedCtx, ts) @@ -1496,7 +1498,7 @@ func testTaskType(t *testing.T, sys *System) { Type: "cows", OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } tskCow, err := sys.TaskService.CreateTask(authorizedCtx, tc) @@ -1511,7 +1513,7 @@ func testTaskType(t *testing.T, sys *System) { Type: "pigs", OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 0), - Token: cr.Token, + OwnerID: cr.UserID, } tskPig, err := sys.TaskService.CreateTask(authorizedCtx, tp)