Clean out user from the task store's (#11882)
* Clean out user from the task store'spull/11919/head
parent
eb32f5bfc4
commit
48cc643992
|
@ -402,7 +402,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
|
||||
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
|
||||
lr := taskbackend.NewQueryLogReader(queryService)
|
||||
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler, authSvc)
|
||||
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, boltStore), lr, m.scheduler, authSvc, userResourceSvc)
|
||||
taskSvc = task.NewValidator(taskSvc, bucketSvc)
|
||||
m.taskStore = boltStore
|
||||
}
|
||||
|
|
|
@ -482,29 +482,6 @@ func (h *TaskHandler) handlePostTask(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// add User resource map
|
||||
urm := &platform.UserResourceMapping{
|
||||
UserID: auth.GetUserID(),
|
||||
UserType: platform.Owner,
|
||||
ResourceType: platform.TasksResourceType,
|
||||
ResourceID: task.ID,
|
||||
}
|
||||
if err := h.UserResourceMappingService.CreateUserResourceMapping(ctx, urm); err != nil {
|
||||
// clean up the task if we fail to map the user and resource
|
||||
// TODO(lh): Multi step creates could benefit from a service wide transactional request
|
||||
if derr := h.TaskService.DeleteTask(ctx, task.ID); derr != nil {
|
||||
err = fmt.Errorf("%s: failed to clean up task: %s", err.Error(), derr.Error())
|
||||
}
|
||||
|
||||
err = &platform.Error{
|
||||
Err: err,
|
||||
Msg: "failed to add user permissions",
|
||||
}
|
||||
|
||||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
if err := encodeResponse(ctx, w, http.StatusCreated, newTaskResponse(*task, []*platform.Label{})); err != nil {
|
||||
logEncodingError(h.logger, r, err)
|
||||
return
|
||||
|
@ -709,21 +686,6 @@ func (h *TaskHandler) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
|
|||
EncodeError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
// clean up resource maps for deleted task
|
||||
urms, _, err := h.UserResourceMappingService.FindUserResourceMappings(ctx, platform.UserResourceMappingFilter{
|
||||
ResourceID: req.TaskID,
|
||||
ResourceType: platform.TasksResourceType,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
h.logger.Warn("failed to pull user resource mapping", zap.Error(err))
|
||||
} else {
|
||||
for _, m := range urms {
|
||||
if err := h.UserResourceMappingService.DeleteUserResourceMapping(ctx, m.ResourceID, m.UserID); err != nil {
|
||||
h.logger.Warn(fmt.Sprintf("failed to remove user resource mapping for task %s", m.ResourceID.String()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
|
|
@ -786,68 +786,6 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestTaskUserResourceMap(t *testing.T) {
|
||||
task := platform.Task{
|
||||
Name: "task1",
|
||||
OrganizationID: 1,
|
||||
AuthorizationID: 0x100,
|
||||
}
|
||||
|
||||
b, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to unmarshal task: %v", err)
|
||||
}
|
||||
|
||||
r := httptest.NewRequest("POST", "http://any.url/v1", bytes.NewReader(b))
|
||||
ctx := pcontext.SetAuthorizer(context.Background(), &platform.Authorization{UserID: 2})
|
||||
r = r.WithContext(ctx)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
var created *platform.UserResourceMapping
|
||||
var deletedUser platform.ID
|
||||
var deletedResource platform.ID
|
||||
|
||||
urms := &mock.UserResourceMappingService{
|
||||
CreateMappingFn: func(_ context.Context, urm *platform.UserResourceMapping) error { created = urm; return nil },
|
||||
DeleteMappingFn: func(_ context.Context, rid platform.ID, uid platform.ID) error {
|
||||
deletedUser = uid
|
||||
deletedResource = rid
|
||||
return nil
|
||||
},
|
||||
FindMappingsFn: func(context.Context, platform.UserResourceMappingFilter) ([]*platform.UserResourceMapping, int, error) {
|
||||
return []*platform.UserResourceMapping{created}, 1, nil
|
||||
},
|
||||
}
|
||||
|
||||
taskBackend := NewMockTaskBackend(t)
|
||||
taskBackend.UserResourceMappingService = urms
|
||||
h := NewTaskHandler(taskBackend)
|
||||
taskID := platform.ID(1)
|
||||
|
||||
h.TaskService = &mock.TaskService{
|
||||
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
|
||||
taskCopy := task
|
||||
return &taskCopy, nil
|
||||
},
|
||||
DeleteTaskFn: func(ctx context.Context, id platform.ID) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
h.handlePostTask(w, r)
|
||||
r = httptest.NewRequest("DELETE", "http://any.url/api/v2/tasks/"+taskID.String(), nil)
|
||||
|
||||
h.ServeHTTP(w, r)
|
||||
|
||||
if created.UserID != deletedUser {
|
||||
t.Fatalf("deleted user (%s) doesn't match created user (%s)", deletedUser, created.UserID)
|
||||
}
|
||||
|
||||
if created.ResourceID != deletedResource {
|
||||
t.Fatalf("deleted resource (%s) doesn't match created resource (%s)", deletedResource, created.ResourceID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_handlePostTaskLabel(t *testing.T) {
|
||||
type fields struct {
|
||||
LabelService platform.LabelService
|
||||
|
|
|
@ -24,7 +24,7 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
|
|||
|
||||
i := inmem.NewService()
|
||||
|
||||
backingTS := task.PlatformAdapter(store, rrw, sch, i)
|
||||
backingTS := task.PlatformAdapter(store, rrw, sch, i, i)
|
||||
|
||||
h := http.NewAuthenticationHandler()
|
||||
h.AuthorizationService = i
|
||||
|
|
|
@ -6,11 +6,9 @@
|
|||
// bucket(/tasks/v1/task_meta) key(:task_id) -> Protocol Buffer encoded backend.StoreTaskMeta,
|
||||
// so we have a consistent view of runs in progress and max concurrency.
|
||||
// bucket(/tasks/v1/org_by_task_id) key(task_id) -> The organization ID (stored as encoded string) associated with given task.
|
||||
// bucket(/tasks/v1/user_by_task_id) key(:task_id) -> The user ID (stored as encoded string) associated with given task.
|
||||
// buket(/tasks/v1/name_by_task_id) key(:task_id) -> The user-supplied name of the script.
|
||||
// bucket(/tasks/v1/name_by_task_id) key(:task_id) -> The user-supplied name of the script.
|
||||
// bucket(/tasks/v1/run_ids) -> Counter for run IDs
|
||||
// bucket(/tasks/v1/orgs).bucket(:org_id) key(:task_id) -> Empty content; presence of :task_id allows for lookup from org to tasks.
|
||||
// bucket(/tasks/v1/users).bucket(:user_id) key(:task_id) -> Empty content; presence of :task_id allows for lookup from user to tasks.
|
||||
// Note that task IDs are stored big-endian uint64s for sorting purposes,
|
||||
// but presented to the users with leading 0-bytes stripped.
|
||||
// Like other components of the system, IDs presented to users may be `0f12` rather than `f12`.
|
||||
|
@ -55,10 +53,8 @@ const basePath = "/tasks/v1/"
|
|||
var (
|
||||
tasksPath = []byte(basePath + "tasks")
|
||||
orgsPath = []byte(basePath + "orgs")
|
||||
usersPath = []byte(basePath + "users")
|
||||
taskMetaPath = []byte(basePath + "task_meta")
|
||||
orgByTaskID = []byte(basePath + "org_by_task_id")
|
||||
userByTaskID = []byte(basePath + "user_by_task_id")
|
||||
nameByTaskID = []byte(basePath + "name_by_task_id")
|
||||
runIDs = []byte(basePath + "run_ids")
|
||||
)
|
||||
|
@ -78,9 +74,8 @@ func New(db *bolt.DB, rootBucket string) (*Store, error) {
|
|||
}
|
||||
// create the buckets inside the root
|
||||
for _, b := range [][]byte{
|
||||
tasksPath, orgsPath, usersPath, taskMetaPath,
|
||||
orgByTaskID, userByTaskID,
|
||||
nameByTaskID, runIDs,
|
||||
tasksPath, orgsPath, taskMetaPath,
|
||||
orgByTaskID, nameByTaskID, runIDs,
|
||||
} {
|
||||
_, err := root.CreateBucketIfNotExists(b)
|
||||
if err != nil {
|
||||
|
@ -147,28 +142,6 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (
|
|||
return err
|
||||
}
|
||||
|
||||
// Encoded user ID
|
||||
encodedUser, err := req.User.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// user
|
||||
userB, err := b.Bucket(usersPath).CreateBucketIfNotExists(encodedUser)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = userB.Put(encodedID, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Bucket(userByTaskID).Put(encodedID, encodedUser)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stm := backend.NewStoreTaskMeta(req, o)
|
||||
stmBytes, err := stm.Marshal()
|
||||
if err != nil {
|
||||
|
@ -236,10 +209,7 @@ func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (
|
|||
}
|
||||
}
|
||||
|
||||
var userID, orgID platform.ID
|
||||
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
var orgID platform.ID
|
||||
|
||||
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
|
@ -274,7 +244,6 @@ func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (
|
|||
res.NewTask = backend.StoreTask{
|
||||
ID: req.ID,
|
||||
Org: orgID,
|
||||
User: userID,
|
||||
Name: op.Name,
|
||||
Script: newScript,
|
||||
}
|
||||
|
@ -286,10 +255,6 @@ func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (
|
|||
|
||||
// ListTasks lists the tasks based on a filter.
|
||||
func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams) ([]backend.StoreTaskWithMeta, error) {
|
||||
if params.Org.Valid() && params.User.Valid() {
|
||||
return nil, errors.New("ListTasks: org and user filters are mutually exclusive")
|
||||
}
|
||||
|
||||
if params.PageSize < 0 {
|
||||
return nil, errors.New("ListTasks: PageSize must be positive")
|
||||
}
|
||||
|
@ -316,16 +281,6 @@ func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams)
|
|||
return ErrNotFound
|
||||
}
|
||||
c = orgB.Cursor()
|
||||
} else if params.User.Valid() {
|
||||
encodedUser, err := params.User.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userB := b.Bucket(usersPath).Bucket(encodedUser)
|
||||
if userB == nil {
|
||||
return ErrNotFound
|
||||
}
|
||||
c = userB.Cursor()
|
||||
} else {
|
||||
c = b.Bucket(tasksPath).Cursor()
|
||||
}
|
||||
|
@ -375,36 +330,7 @@ func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams)
|
|||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
encodedID, err := taskIDs[i].Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tasks[i].Task.Org = params.Org
|
||||
var userID platform.ID
|
||||
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
tasks[i].Task.User = userID
|
||||
}
|
||||
}
|
||||
goto POPULATE_META
|
||||
}
|
||||
if params.User.Valid() {
|
||||
for i := range taskIDs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
encodedID, err := taskIDs[i].Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tasks[i].Task.User = params.User
|
||||
var orgID platform.ID
|
||||
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
tasks[i].Task.Org = orgID
|
||||
}
|
||||
}
|
||||
goto POPULATE_META
|
||||
|
@ -419,12 +345,6 @@ func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams)
|
|||
return err
|
||||
}
|
||||
|
||||
var userID platform.ID
|
||||
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
tasks[i].Task.User = userID
|
||||
|
||||
var orgID platform.ID
|
||||
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
|
@ -463,7 +383,7 @@ func (s *Store) ListTasks(ctx context.Context, params backend.TaskSearchParams)
|
|||
|
||||
// FindTaskByID finds a task with a given an ID. It will return nil if the task does not exist.
|
||||
func (s *Store) FindTaskByID(ctx context.Context, id platform.ID) (*backend.StoreTask, error) {
|
||||
var userID, orgID platform.ID
|
||||
var orgID platform.ID
|
||||
var script, name string
|
||||
encodedID, err := id.Encode()
|
||||
if err != nil {
|
||||
|
@ -477,10 +397,6 @@ func (s *Store) FindTaskByID(ctx context.Context, id platform.ID) (*backend.Stor
|
|||
}
|
||||
script = string(scriptBytes)
|
||||
|
||||
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -495,7 +411,6 @@ func (s *Store) FindTaskByID(ctx context.Context, id platform.ID) (*backend.Stor
|
|||
return &backend.StoreTask{
|
||||
ID: id,
|
||||
Org: orgID,
|
||||
User: userID,
|
||||
Name: name,
|
||||
Script: script,
|
||||
}, err
|
||||
|
@ -524,7 +439,7 @@ func (s *Store) FindTaskMetaByID(ctx context.Context, id platform.ID) (*backend.
|
|||
|
||||
func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*backend.StoreTask, *backend.StoreTaskMeta, error) {
|
||||
var stmBytes []byte
|
||||
var userID, orgID platform.ID
|
||||
var orgID platform.ID
|
||||
var script, name string
|
||||
encodedID, err := id.Encode()
|
||||
if err != nil {
|
||||
|
@ -541,10 +456,6 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back
|
|||
// Assign copies of everything so we don't hold a stale reference to a bolt-maintained byte slice.
|
||||
stmBytes = append(stmBytes, b.Bucket(taskMetaPath).Get(encodedID)...)
|
||||
|
||||
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -564,7 +475,6 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back
|
|||
return &backend.StoreTask{
|
||||
ID: id,
|
||||
Org: orgID,
|
||||
User: userID,
|
||||
Name: name,
|
||||
Script: script,
|
||||
}, &stm, nil
|
||||
|
@ -587,15 +497,6 @@ func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, e
|
|||
if err := b.Bucket(tasksPath).Delete(encodedID); err != nil {
|
||||
return err
|
||||
}
|
||||
user := b.Bucket(userByTaskID).Get(encodedID)
|
||||
if len(user) > 0 {
|
||||
if err := b.Bucket(usersPath).Bucket(user).Delete(encodedID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := b.Bucket(userByTaskID).Delete(encodedID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(nameByTaskID).Delete(encodedID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -722,70 +623,6 @@ func (s *Store) Close() error {
|
|||
return s.db.Close()
|
||||
}
|
||||
|
||||
// DeleteUser syncronously deletes a user and all their tasks from a bolt store.
|
||||
func (s *Store) DeleteUser(ctx context.Context, id platform.ID) error {
|
||||
userID, err := id.Encode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucket)
|
||||
ub := b.Bucket(usersPath).Bucket(userID)
|
||||
if ub == nil {
|
||||
return backend.ErrUserNotFound
|
||||
}
|
||||
c := ub.Cursor()
|
||||
i := 0
|
||||
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
||||
i++
|
||||
// check for cancelation every 256 tasks deleted
|
||||
if i&0xFF == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
if err := b.Bucket(tasksPath).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(taskMetaPath).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(orgByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(userByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(nameByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
org := b.Bucket(orgByTaskID).Get(k)
|
||||
if len(org) > 0 {
|
||||
ob := b.Bucket(orgsPath).Bucket(org)
|
||||
if ob != nil {
|
||||
if err := ob.Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check for cancelation one last time before we return
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
return b.Bucket(usersPath).DeleteBucket(userID)
|
||||
}
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteOrg syncronously deletes an org and all their tasks from a bolt store.
|
||||
func (s *Store) DeleteOrg(ctx context.Context, id platform.ID) error {
|
||||
orgID, err := id.Encode()
|
||||
|
@ -820,21 +657,9 @@ func (s *Store) DeleteOrg(ctx context.Context, id platform.ID) error {
|
|||
if err := b.Bucket(orgByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(userByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.Bucket(nameByTaskID).Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
user := b.Bucket(userByTaskID).Get(k)
|
||||
if len(user) > 0 {
|
||||
ub := b.Bucket(usersPath).Bucket(user)
|
||||
if ub != nil {
|
||||
if err := ub.Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// check for cancelation one last time before we return
|
||||
select {
|
||||
|
|
|
@ -148,23 +148,6 @@ func (c *Coordinator) DeleteOrg(ctx context.Context, orgID platform.ID) error {
|
|||
return c.Store.DeleteOrg(ctx, orgID)
|
||||
}
|
||||
|
||||
func (c *Coordinator) DeleteUser(ctx context.Context, userID platform.ID) error {
|
||||
userTasks, err := c.Store.ListTasks(ctx, backend.TaskSearchParams{
|
||||
User: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, userTask := range userTasks {
|
||||
if err := c.sch.ReleaseTask(userTask.Task.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return c.Store.DeleteUser(ctx, userID)
|
||||
}
|
||||
|
||||
func (c *Coordinator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
|
||||
return c.sch.CancelRun(ctx, taskID, runID)
|
||||
}
|
||||
|
|
|
@ -47,8 +47,7 @@ func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.I
|
|||
task := StoreTask{
|
||||
ID: id,
|
||||
|
||||
Org: req.Org,
|
||||
User: req.User,
|
||||
Org: req.Org,
|
||||
|
||||
Name: o.Name,
|
||||
|
||||
|
@ -128,10 +127,6 @@ func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTask
|
|||
}
|
||||
|
||||
func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTaskWithMeta, error) {
|
||||
if params.Org.Valid() && params.User.Valid() {
|
||||
return nil, errors.New("ListTasks: org and user filters are mutually exclusive")
|
||||
}
|
||||
|
||||
if params.PageSize < 0 {
|
||||
return nil, errors.New("ListTasks: PageSize must be positive")
|
||||
}
|
||||
|
@ -147,7 +142,6 @@ func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTa
|
|||
out := make([]StoreTaskWithMeta, 0, lim)
|
||||
|
||||
org := params.Org
|
||||
user := params.User
|
||||
|
||||
var after platform.ID
|
||||
if !params.After.Valid() {
|
||||
|
@ -166,9 +160,6 @@ func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTa
|
|||
if org.Valid() && org != t.Org {
|
||||
continue
|
||||
}
|
||||
if user.Valid() && user != t.User {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, StoreTaskWithMeta{Task: t})
|
||||
if len(out) >= lim {
|
||||
|
@ -357,10 +348,6 @@ func (s *inmem) delete(ctx context.Context, id platform.ID, f func(StoreTask) pl
|
|||
return nil
|
||||
}
|
||||
|
||||
func getUser(st StoreTask) platform.ID {
|
||||
return st.User
|
||||
}
|
||||
|
||||
func getOrg(st StoreTask) platform.ID {
|
||||
return st.Org
|
||||
}
|
||||
|
@ -369,8 +356,3 @@ func getOrg(st StoreTask) platform.ID {
|
|||
func (s *inmem) DeleteOrg(ctx context.Context, id platform.ID) error {
|
||||
return s.delete(ctx, id, getOrg)
|
||||
}
|
||||
|
||||
// DeleteUser synchronously deletes a user and all their tasks from a from an in-mem store store.
|
||||
func (s *inmem) DeleteUser(ctx context.Context, id platform.ID) error {
|
||||
return s.delete(ctx, id, getUser)
|
||||
}
|
||||
|
|
|
@ -297,9 +297,6 @@ type Store interface {
|
|||
// DeleteOrg deletes the org.
|
||||
DeleteOrg(ctx context.Context, orgID platform.ID) error
|
||||
|
||||
// DeleteUser deletes a user with userID.
|
||||
DeleteUser(ctx context.Context, userID platform.ID) error
|
||||
|
||||
// Close closes the store for usage and cleans up running processes.
|
||||
Close() error
|
||||
}
|
||||
|
|
|
@ -48,7 +48,6 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName
|
|||
"FinishRun": testStoreFinishRun,
|
||||
"ManuallyRunTimeRange": testStoreManuallyRunTimeRange,
|
||||
"DeleteOrg": testStoreDeleteOrg,
|
||||
"DeleteUser": testStoreDeleteUser,
|
||||
}
|
||||
|
||||
return func(t *testing.T) {
|
||||
|
@ -341,14 +340,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
t.Fatalf("expected no results for bad org ID, got %d result(s)", len(ts))
|
||||
}
|
||||
|
||||
ts, err = s.ListTasks(context.Background(), backend.TaskSearchParams{User: platform.ID(123)})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(ts) > 0 {
|
||||
t.Fatalf("expected no results for bad user ID, got %d result(s)", len(ts))
|
||||
}
|
||||
|
||||
newID, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: userID, AuthorizationID: authzID, Script: fmt.Sprintf(scriptFmt, 1), Status: backend.TaskInactive})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -432,10 +423,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
t.Fatalf("task org mismatch at index %d: got %x, expected %x", i, g.Task.Org, orgID)
|
||||
}
|
||||
|
||||
if userID != g.Task.User {
|
||||
t.Fatalf("task user mismatch at index %d: got %x, expected %x", i, g.Task.User, userID)
|
||||
}
|
||||
|
||||
if tasks[i].name != g.Task.Name {
|
||||
t.Fatalf("task name mismatch at index %d: got %q, expected %q", i, g.Task.Name, tasks[i].name)
|
||||
}
|
||||
|
@ -458,10 +445,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
if _, err := s.ListTasks(context.Background(), backend.TaskSearchParams{PageSize: math.MaxInt32}); err == nil {
|
||||
t.Fatal("expected error for huge page size but got nil")
|
||||
}
|
||||
|
||||
if _, err := s.ListTasks(context.Background(), backend.TaskSearchParams{Org: platform.ID(1), User: platform.ID(2)}); err == nil {
|
||||
t.Fatal("expected error when specifying both org and user, but got nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -497,9 +480,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
if task.Org != org {
|
||||
t.Fatalf("unexpected org: got %v, exp %v", task.Org, org)
|
||||
}
|
||||
if task.User != user {
|
||||
t.Fatalf("unexpected user: got %v, exp %v", task.User, user)
|
||||
}
|
||||
if task.Name != "a task" {
|
||||
t.Fatalf("unexpected name %q", task.Name)
|
||||
}
|
||||
|
@ -704,9 +684,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
if task.Org != org {
|
||||
t.Fatalf("unexpected org: got %v, exp %v", task.Org, org)
|
||||
}
|
||||
if task.User != user {
|
||||
t.Fatalf("unexpected user: got %v, exp %v", task.User, user)
|
||||
}
|
||||
if task.Name != "a task" {
|
||||
t.Fatalf("unexpected name %q", task.Name)
|
||||
}
|
||||
|
@ -1016,30 +993,6 @@ from(bucket:"test") |> range(start:-1h)`
|
|||
}
|
||||
}
|
||||
|
||||
func testStoreDeleteUser(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
|
||||
s := create(t)
|
||||
defer destroy(t, s)
|
||||
ids := createABunchOFTasks(t, s,
|
||||
func(u, _ uint64) bool {
|
||||
return u == 1
|
||||
},
|
||||
)
|
||||
user := platform.ID(1)
|
||||
err := s.DeleteUser(context.Background(), user)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := range ids {
|
||||
task, err := s.FindTaskByID(context.Background(), ids[i])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if task != nil {
|
||||
t.Fatal("expected task to be deleted but it was not")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testStoreDeleteOrg(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
|
||||
s := create(t)
|
||||
defer destroy(t, s)
|
||||
|
|
|
@ -18,8 +18,8 @@ type RunController interface {
|
|||
}
|
||||
|
||||
// PlatformAdapter wraps a task.Store into the platform.TaskService interface.
|
||||
func PlatformAdapter(s backend.Store, r backend.LogReader, rc RunController, as platform.AuthorizationService) platform.TaskService {
|
||||
return pAdapter{s: s, r: r, rc: rc, as: as}
|
||||
func PlatformAdapter(s backend.Store, r backend.LogReader, rc RunController, as platform.AuthorizationService, urm platform.UserResourceMappingService) platform.TaskService {
|
||||
return pAdapter{s: s, r: r, rc: rc, as: as, urm: urm}
|
||||
}
|
||||
|
||||
type pAdapter struct {
|
||||
|
@ -28,7 +28,8 @@ type pAdapter struct {
|
|||
r backend.LogReader
|
||||
|
||||
// Needed to look up authorization ID from token during create.
|
||||
as platform.AuthorizationService
|
||||
as platform.AuthorizationService
|
||||
urm platform.UserResourceMappingService
|
||||
}
|
||||
|
||||
var _ platform.TaskService = pAdapter{}
|
||||
|
@ -53,8 +54,35 @@ func (p pAdapter) FindTasks(ctx context.Context, filter platform.TaskFilter) ([]
|
|||
params.Org = *filter.Organization
|
||||
}
|
||||
if filter.User != nil {
|
||||
params.User = *filter.User
|
||||
ownedTasks, _, err := p.urm.FindUserResourceMappings(
|
||||
ctx,
|
||||
platform.UserResourceMappingFilter{
|
||||
UserID: *filter.User,
|
||||
UserType: platform.Owner,
|
||||
ResourceType: platform.TasksResourceType,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var tasks []*platform.Task
|
||||
for _, ownedTask := range ownedTasks {
|
||||
storeTask, meta, err := p.s.FindTaskByIDWithMeta(ctx, ownedTask.ResourceID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
task, err := toPlatformTask(*storeTask, meta)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
tasks = append(tasks, task)
|
||||
|
||||
}
|
||||
return tasks, len(tasks), nil
|
||||
}
|
||||
|
||||
if filter.After != nil {
|
||||
params.After = *filter.After
|
||||
}
|
||||
|
@ -126,6 +154,23 @@ func (p pAdapter) CreateTask(ctx context.Context, t platform.TaskCreate) (*platf
|
|||
task.Offset = opts.Offset.String()
|
||||
}
|
||||
|
||||
mapping := &platform.UserResourceMapping{
|
||||
UserID: auth.GetUserID(),
|
||||
UserType: platform.Owner,
|
||||
ResourceType: platform.TasksResourceType,
|
||||
ResourceID: task.ID,
|
||||
}
|
||||
|
||||
if err := p.urm.CreateUserResourceMapping(ctx, mapping); err != nil {
|
||||
// clean up the task if we fail to map the user and resource
|
||||
// TODO(lh): Multi step creates could benefit from a service wide transactional request
|
||||
if derr := p.DeleteTask(ctx, task.ID); derr != nil {
|
||||
err = fmt.Errorf("%s: failed to clean up task: %s", err.Error(), derr.Error())
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
|
@ -161,8 +206,27 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
|
|||
|
||||
func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||
_, err := p.s.DeleteTask(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO(mr): Store.DeleteTask returns false, nil if ID didn't match; do we want to handle that case?
|
||||
return err
|
||||
|
||||
// clean up resource maps for deleted task
|
||||
urms, _, err := p.urm.FindUserResourceMappings(ctx, platform.UserResourceMappingFilter{
|
||||
ResourceID: id,
|
||||
ResourceType: platform.TasksResourceType,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, m := range urms {
|
||||
if err := p.urm.DeleteUserResourceMapping(ctx, m.ResourceID, m.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||
|
|
|
@ -43,7 +43,7 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory) {
|
|||
sys, cancel := fn(t)
|
||||
defer cancel()
|
||||
if sys.TaskServiceFunc == nil {
|
||||
sys.ts = task.PlatformAdapter(sys.S, sys.LR, sys.Sch, sys.I)
|
||||
sys.ts = task.PlatformAdapter(sys.S, sys.LR, sys.Sch, sys.I, sys.I)
|
||||
} else {
|
||||
sys.ts = sys.TaskServiceFunc()
|
||||
}
|
||||
|
@ -139,11 +139,11 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
|||
|
||||
authorizedCtx := icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
|
||||
|
||||
task, err := sys.ts.CreateTask(authorizedCtx, tc)
|
||||
tsk, err := sys.ts.CreateTask(authorizedCtx, tc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !task.ID.Valid() {
|
||||
if !tsk.ID.Valid() {
|
||||
t.Fatal("no task ID set")
|
||||
}
|
||||
|
||||
|
@ -160,11 +160,11 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
|||
// Look up a task the different ways we can.
|
||||
// Map of method name to found task.
|
||||
found := map[string]*platform.Task{
|
||||
"Created": task,
|
||||
"Created": tsk,
|
||||
}
|
||||
|
||||
// Find by ID should return the right task.
|
||||
f, err := sys.ts.FindTaskByID(sys.Ctx, task.ID)
|
||||
f, err := sys.ts.FindTaskByID(sys.Ctx, tsk.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -174,15 +174,14 @@ func testTaskCRUD(t *testing.T, sys *System) {
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f = findTask(fs, task.ID)
|
||||
f = findTask(fs, tsk.ID)
|
||||
found["FindTasks with Organization filter"] = f
|
||||
|
||||
fs, _, err = sys.ts.FindTasks(sys.Ctx, platform.TaskFilter{User: &cr.UserID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
f = findTask(fs, task.ID)
|
||||
f = findTask(fs, tsk.ID)
|
||||
found["FindTasks with User filter"] = f
|
||||
|
||||
for fn, f := range found {
|
||||
|
|
|
@ -8757,7 +8757,7 @@
|
|||
"dependencies": {
|
||||
"minimist": {
|
||||
"version": "0.0.10",
|
||||
"resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz",
|
||||
"resolved": "http://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz",
|
||||
"integrity": "sha1-3j+YVD2/lggr5IrRoMfNqDYwHc8=",
|
||||
"dev": true
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue