From b8f5604613cf6ec7e2de5406e72b0141d20c2462 Mon Sep 17 00:00:00 2001 From: George Date: Fri, 10 Jul 2020 11:05:07 +0100 Subject: [PATCH] revert(tasks): Add new permission lookup pattern for executor (#18869) (#18915) This reverts commit e9e4d794cf25370cdc2aea8a066ec3d438ff2879. --- authorizer/user.go | 7 - cmd/influxd/launcher/launcher.go | 2 +- http/user_service.go | 7 - kv/duplicate_read_test.go | 105 ++++++ kv/user.go | 7 - mock/user_service.go | 18 +- task/backend/executor/executor.go | 30 +- task/backend/executor/executor_test.go | 9 +- .../executor/mock/permission_service.go | 127 ------- tenant/duplicate_reads.go | 344 ++++++++++++++++++ tenant/http_client_user.go | 13 - tenant/http_server_user.go | 26 -- tenant/middleware_user_auth.go | 7 - tenant/middleware_user_logging.go | 13 - tenant/middleware_user_metrics.go | 6 - tenant/service_urm.go | 16 - tenant/service_user.go | 32 -- tenant/service_user_test.go | 75 ---- user.go | 3 - user_resource_mapping.go | 47 +-- 20 files changed, 476 insertions(+), 418 deletions(-) create mode 100644 kv/duplicate_read_test.go delete mode 100644 task/backend/executor/mock/permission_service.go create mode 100644 tenant/duplicate_reads.go diff --git a/authorizer/user.go b/authorizer/user.go index 1c8904094a..29c1b86d3f 100644 --- a/authorizer/user.go +++ b/authorizer/user.go @@ -75,10 +75,3 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error { } return s.s.DeleteUser(ctx, id) } - -func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) { - return nil, &influxdb.Error{ - Code: influxdb.EInternal, - Msg: "not implemented", - } -} diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index fc1bb0392b..3405711585 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -752,7 +752,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { executor, executorMetrics := executor.NewExecutor( m.log.With(zap.String("service", "task-executor")), query.QueryServiceBridge{AsyncQueryService: m.queryController}, - ts.UserSvc, + authSvc, combinedTaskService, combinedTaskService, ) diff --git a/http/user_service.go b/http/user_service.go index da11a3ba22..5912885770 100644 --- a/http/user_service.go +++ b/http/user_service.go @@ -596,13 +596,6 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error { Do(ctx) } -func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) { - return nil, &influxdb.Error{ - Code: influxdb.EInternal, - Msg: "not implemented", - } -} - // PasswordService is an http client to speak to the password service. type PasswordService struct { Client *httpc.Client diff --git a/kv/duplicate_read_test.go b/kv/duplicate_read_test.go new file mode 100644 index 0000000000..2272a753ac --- /dev/null +++ b/kv/duplicate_read_test.go @@ -0,0 +1,105 @@ +package kv_test + +import ( + "testing" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/tenant" + influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "go.uber.org/zap/zaptest" +) + +func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) { + s, closeInMem, err := NewTestInmemStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc, op, closeSvc := initBucketService(s, f, t) + ro := tenant.NewReadOnlyStore(s) + newSvc := tenant.NewService(ro) + svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc) + return svc, op, func() { + closeSvc() + closeInMem() + } +} + +func TestBoltDuplicateReadBucketService(t *testing.T) { + influxdbtesting.BucketService(initBoltDuplicateReadBucketService, t) +} + +func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) { + s, closeBolt, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc, op, closeSvc := initOrganizationService(s, f, t) + ro := tenant.NewReadOnlyStore(s) + newSvc := tenant.NewService(ro) + svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc) + return svc, op, func() { + closeSvc() + closeBolt() + } +} + +func TestBoltDuplicateReadOrganizationService(t *testing.T) { + influxdbtesting.OrganizationService(initBoltDuplicateReadOrganizationService, t) +} + +func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) { + s, closeBolt, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc, closeSvc := initUserResourceMappingService(s, f, t) + ro := tenant.NewReadOnlyStore(s) + newSvc := tenant.NewService(ro) + svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc) + return svc, func() { + closeSvc() + closeBolt() + } +} + +func TestBoltDuplicateReadUserResourceMappingService(t *testing.T) { + influxdbtesting.UserResourceMappingService(initBoltDuplicateReadUserResourceMappingService, t) +} + +func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) { + s, closeBolt, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc, op, closeSvc := initUserService(s, f, t) + ro := tenant.NewReadOnlyStore(s) + newSvc := tenant.NewService(ro) + svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc) + return svc, op, func() { + closeSvc() + closeBolt() + } +} + +func TestBoltDuplicateReadUserService(t *testing.T) { + influxdbtesting.UserService(initBoltDuplicateReadUserService, t) +} + +func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) { + s, closeStore, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc, closeSvc := initPasswordsService(s, f, t) + ro := tenant.NewReadOnlyStore(s) + newSvc := tenant.NewService(ro) + svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc) + return svc, func() { + closeSvc() + closeStore() + } +} + +func TestBoltDuplicateReadPasswordService(t *testing.T) { + influxdbtesting.PasswordsService(initBoltDuplicateReadPasswordsService, t) +} diff --git a/kv/user.go b/kv/user.go index 72e4c7ad51..02a06bf468 100644 --- a/kv/user.go +++ b/kv/user.go @@ -442,13 +442,6 @@ func (s *Service) deleteUser(ctx context.Context, tx Tx, id influxdb.ID) error { return nil } -func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) { - return nil, &influxdb.Error{ - Code: influxdb.EInternal, - Msg: "not implemented", - } -} - func (s *Service) deleteUsersAuthorizations(ctx context.Context, tx Tx, id influxdb.ID) error { authFilter := influxdb.AuthorizationFilter{ UserID: &id, diff --git a/mock/user_service.go b/mock/user_service.go index f6ba710923..879fc8bbf0 100644 --- a/mock/user_service.go +++ b/mock/user_service.go @@ -12,13 +12,12 @@ var _ platform.UserService = (*UserService)(nil) // also makes it a suitable mock to use wherever an platform.UserService is required. type UserService struct { // Methods for a platform.UserService - FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error) - FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) - CreateUserFn func(context.Context, *platform.User) error - DeleteUserFn func(context.Context, platform.ID) error - FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error) - UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error) - FindPermissionForUserFn func(context.Context, platform.ID) (platform.PermissionSet, error) + FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error) + FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) + CreateUserFn func(context.Context, *platform.User) error + DeleteUserFn func(context.Context, platform.ID) error + FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error) + UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error) } // NewUserService returns a mock of UserService where its methods will return zero values. @@ -32,7 +31,6 @@ func NewUserService() *UserService { FindUsersFn: func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) { return nil, 0, nil }, - FindPermissionForUserFn: func(context.Context, platform.ID) (platform.PermissionSet, error) { return nil, nil }, } } @@ -65,7 +63,3 @@ func (s *UserService) FindUser(ctx context.Context, filter platform.UserFilter) func (s *UserService) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) { return s.UpdateUserFn(ctx, id, upd) } - -func (s *UserService) FindPermissionForUser(ctx context.Context, uid platform.ID) (platform.PermissionSet, error) { - return s.FindPermissionForUserFn(ctx, uid) -} diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 2a103c55d9..283e76f287 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -25,10 +25,6 @@ const ( var _ scheduler.Executor = (*Executor)(nil) -type PermissionService interface { - FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error) -} - type Promise interface { ID() influxdb.ID Cancel(ctx context.Context) @@ -88,7 +84,7 @@ func WithNonSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption { } // NewExecutor creates a new task executor -func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) { +func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) { cfg := &executorConfig{ maxWorkers: defaultMaxWorkers, systemBuildCompiler: NewASTCompiler, @@ -103,7 +99,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t ts: ts, tcs: tcs, qs: qs, - ps: us, + as: as, currentPromises: sync.Map{}, promiseQueue: make(chan *promise, maxPromises), @@ -130,7 +126,7 @@ type Executor struct { tcs backend.TaskControlService qs query.QueryService - ps PermissionService + as influxdb.AuthorizationService metrics *ExecutorMetrics @@ -280,24 +276,16 @@ func (e *Executor) createPromise(ctx context.Context, run *influxdb.Run) (*promi if err != nil { return nil, err } - - perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID) - if err != nil { - return nil, err + if !t.Authorization.GetUserID().Valid() { + t.Authorization.UserID = t.OwnerID } ctx, cancel := context.WithCancel(ctx) // create promise p := &promise{ - run: run, - task: t, - auth: &influxdb.Authorization{ - Status: influxdb.Active, - UserID: t.OwnerID, - ID: influxdb.ID(1), - OrgID: t.OrganizationID, - Permissions: perm, - }, + run: run, + task: t, + auth: t.Authorization, createdAt: time.Now().UTC(), done: make(chan struct{}), ctx: ctx, @@ -454,7 +442,7 @@ func (w *worker) executeQuery(p *promise) { // start w.start(p) - ctx = icontext.SetAuthorizer(ctx, p.auth) + ctx = icontext.SetAuthorizer(ctx, p.task.Authorization) buildCompiler := w.systemBuildCompiler if p.task.Type != influxdb.TaskSystemType { diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 4a1b6b97fc..72e0e19e7b 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/golang/mock/gomock" "github.com/influxdata/flux" "github.com/influxdata/influxdb/v2" icontext "github.com/influxdata/influxdb/v2/context" @@ -23,7 +22,6 @@ import ( "github.com/influxdata/influxdb/v2/query" "github.com/influxdata/influxdb/v2/query/fluxlang" "github.com/influxdata/influxdb/v2/task/backend" - "github.com/influxdata/influxdb/v2/task/backend/executor/mock" "github.com/influxdata/influxdb/v2/task/backend/scheduler" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" @@ -64,16 +62,13 @@ func taskExecutorSystem(t *testing.T) tes { if err := all.Up(ctx, logger, store); err != nil { t.Fatal(err) } - ctrl := gomock.NewController(t) - ps := mock.NewMockPermissionService(ctrl) - ps.EXPECT().FindPermissionForUser(gomock.Any(), gomock.Any()).Return(influxdb.PermissionSet{}, nil).AnyTimes() + var ( svc = kv.NewService(logger, store, kv.ServiceConfig{ FluxLanguageService: fluxlang.DefaultService, }) - tcs = &taskControlService{TaskControlService: svc} - ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, ps, svc, tcs) + ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, svc, svc, tcs) ) return tes{ svc: aqs, diff --git a/task/backend/executor/mock/permission_service.go b/task/backend/executor/mock/permission_service.go deleted file mode 100644 index 91f79ddca2..0000000000 --- a/task/backend/executor/mock/permission_service.go +++ /dev/null @@ -1,127 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: executor.go - -// Package mock is a generated GoMock package. -package mock - -import ( - context "context" - gomock "github.com/golang/mock/gomock" - influxdb "github.com/influxdata/influxdb/v2" - reflect "reflect" -) - -// MockPermissionService is a mock of PermissionService interface -type MockPermissionService struct { - ctrl *gomock.Controller - recorder *MockPermissionServiceMockRecorder -} - -// MockPermissionServiceMockRecorder is the mock recorder for MockPermissionService -type MockPermissionServiceMockRecorder struct { - mock *MockPermissionService -} - -// NewMockPermissionService creates a new mock instance -func NewMockPermissionService(ctrl *gomock.Controller) *MockPermissionService { - mock := &MockPermissionService{ctrl: ctrl} - mock.recorder = &MockPermissionServiceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockPermissionService) EXPECT() *MockPermissionServiceMockRecorder { - return m.recorder -} - -// FindPermissionForUser mocks base method -func (m *MockPermissionService) FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindPermissionForUser", ctx, UserID) - ret0, _ := ret[0].(influxdb.PermissionSet) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// FindPermissionForUser indicates an expected call of FindPermissionForUser -func (mr *MockPermissionServiceMockRecorder) FindPermissionForUser(ctx, UserID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindPermissionForUser", reflect.TypeOf((*MockPermissionService)(nil).FindPermissionForUser), ctx, UserID) -} - -// MockPromise is a mock of Promise interface -type MockPromise struct { - ctrl *gomock.Controller - recorder *MockPromiseMockRecorder -} - -// MockPromiseMockRecorder is the mock recorder for MockPromise -type MockPromiseMockRecorder struct { - mock *MockPromise -} - -// NewMockPromise creates a new mock instance -func NewMockPromise(ctrl *gomock.Controller) *MockPromise { - mock := &MockPromise{ctrl: ctrl} - mock.recorder = &MockPromiseMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockPromise) EXPECT() *MockPromiseMockRecorder { - return m.recorder -} - -// ID mocks base method -func (m *MockPromise) ID() influxdb.ID { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ID") - ret0, _ := ret[0].(influxdb.ID) - return ret0 -} - -// ID indicates an expected call of ID -func (mr *MockPromiseMockRecorder) ID() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockPromise)(nil).ID)) -} - -// Cancel mocks base method -func (m *MockPromise) Cancel(ctx context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Cancel", ctx) -} - -// Cancel indicates an expected call of Cancel -func (mr *MockPromiseMockRecorder) Cancel(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*MockPromise)(nil).Cancel), ctx) -} - -// Done mocks base method -func (m *MockPromise) Done() <-chan struct{} { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Done") - ret0, _ := ret[0].(<-chan struct{}) - return ret0 -} - -// Done indicates an expected call of Done -func (mr *MockPromiseMockRecorder) Done() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockPromise)(nil).Done)) -} - -// Error mocks base method -func (m *MockPromise) Error() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Error") - ret0, _ := ret[0].(error) - return ret0 -} - -// Error indicates an expected call of Error -func (mr *MockPromiseMockRecorder) Error() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockPromise)(nil).Error)) -} diff --git a/tenant/duplicate_reads.go b/tenant/duplicate_reads.go new file mode 100644 index 0000000000..c48fc495c1 --- /dev/null +++ b/tenant/duplicate_reads.go @@ -0,0 +1,344 @@ +package tenant + +import ( + "context" + "fmt" + "sort" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" + "go.uber.org/zap" +) + +var _ influxdb.TenantService = (*tenantService)(nil) + +var bucketCmpOptions = cmp.Options{ + cmp.Transformer("Sort", func(in []*influxdb.Bucket) []*influxdb.Bucket { + out := append([]*influxdb.Bucket(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].Name > out[j].Name + }) + return out + }), +} + +// readOnlyStore is a wrapper for kv.Store that ensures that updates are not applied. +type readOnlyStore struct { + kv.Store +} + +func (r readOnlyStore) Update(context.Context, func(kv.Tx) error) error { + return nil +} + +type tenantService struct { + log *zap.Logger + + oldBucketSvc influxdb.BucketService + newBucketSvc influxdb.BucketService + oldOrgSvc influxdb.OrganizationService + newOrgSvc influxdb.OrganizationService + oldURMSvc influxdb.UserResourceMappingService + newURMSvc influxdb.UserResourceMappingService + oldUserSvc influxdb.UserService + newUserSvc influxdb.UserService + oldPwdSvc influxdb.PasswordsService + newPwdSvc influxdb.PasswordsService +} + +// NewReadOnlyStore returns a Store that cannot update the underlying kv.Store. +func NewReadOnlyStore(store kv.Store) *Store { + return NewStore(readOnlyStore{Store: store}) +} + +// NewDuplicateReadTenantService returns a tenant service that duplicates the reads to oldSvc and newSvc. +// The foreseen use case is to compare two service versions, an old one and a new one. +// The resulting influxdb.TenantService: +// - forwards writes to the old service; +// - reads from the old one, if no error is encountered, it reads from the new one; +// - compares the results obtained and logs the difference, if any. +func NewDuplicateReadTenantService(log *zap.Logger, oldSvc, newSvc influxdb.TenantService) influxdb.TenantService { + return tenantService{ + log: log, + + oldBucketSvc: oldSvc, + oldOrgSvc: oldSvc, + oldURMSvc: oldSvc, + oldUserSvc: oldSvc, + oldPwdSvc: oldSvc, + + newBucketSvc: newSvc, + newOrgSvc: newSvc, + newURMSvc: newSvc, + newUserSvc: newSvc, + newPwdSvc: newSvc, + } +} + +// NewDuplicateReadBucketService returns a service that duplicates the reads to oldSvc and newSvc. +func NewDuplicateReadBucketService(log *zap.Logger, oldSvc, newSvc influxdb.BucketService) influxdb.BucketService { + return tenantService{ + log: log, + + oldBucketSvc: oldSvc, + newBucketSvc: newSvc, + } +} + +// NewDuplicateReadOrganizationService returns a service that duplicates the reads to oldSvc and newSvc. +func NewDuplicateReadOrganizationService(log *zap.Logger, oldSvc, newSvc influxdb.OrganizationService) influxdb.OrganizationService { + return tenantService{ + log: log, + + oldOrgSvc: oldSvc, + newOrgSvc: newSvc, + } +} + +// NewDuplicateReadUserResourceMappingService returns a service that duplicates the reads to oldSvc and newSvc. +func NewDuplicateReadUserResourceMappingService(log *zap.Logger, oldSvc, newSvc influxdb.UserResourceMappingService) influxdb.UserResourceMappingService { + return tenantService{ + log: log, + + oldURMSvc: oldSvc, + newURMSvc: newSvc, + } +} + +// NewDuplicateReadUserService returns a service that duplicates the reads to oldSvc and newSvc. +func NewDuplicateReadUserService(log *zap.Logger, oldSvc, newSvc influxdb.UserService) influxdb.UserService { + return tenantService{ + log: log, + + oldUserSvc: oldSvc, + newUserSvc: newSvc, + } +} + +// NewDuplicateReadPasswordsService returns a service that duplicates the reads to oldSvc and newSvc. +func NewDuplicateReadPasswordsService(log *zap.Logger, oldSvc, newSvc influxdb.PasswordsService) influxdb.PasswordsService { + return tenantService{ + log: log, + + oldPwdSvc: oldSvc, + newPwdSvc: newSvc, + } +} + +func (s tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) { + o, err := s.oldBucketSvc.FindBucketByID(ctx, id) + if err != nil { + return o, err + } + n, err := s.newBucketSvc.FindBucketByID(ctx, id) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByID")) + } + return o, nil +} + +func (s tenantService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) { + o, err := s.oldBucketSvc.FindBucketByName(ctx, orgID, name) + if err != nil { + return o, err + } + n, err := s.newBucketSvc.FindBucketByName(ctx, orgID, name) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByName")) + } + return o, nil +} + +func (s tenantService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) { + o, err := s.oldBucketSvc.FindBucket(ctx, filter) + if err != nil { + return o, err + } + n, err := s.newBucketSvc.FindBucket(ctx, filter) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucket")) + } + return o, nil +} + +func (s tenantService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) { + o, no, err := s.oldBucketSvc.FindBuckets(ctx, filter, opt...) + if err != nil { + return o, no, err + } + n, _, err := s.newBucketSvc.FindBuckets(ctx, filter, opt...) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n, bucketCmpOptions); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBuckets")) + } + return o, no, nil +} + +func (s tenantService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { + return s.oldBucketSvc.CreateBucket(ctx, b) +} + +func (s tenantService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { + return s.oldBucketSvc.UpdateBucket(ctx, id, upd) +} + +func (s tenantService) DeleteBucket(ctx context.Context, id influxdb.ID) error { + return s.oldBucketSvc.DeleteBucket(ctx, id) +} + +func (s tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) { + o, err := s.oldOrgSvc.FindOrganizationByID(ctx, id) + if err != nil { + return o, err + } + n, err := s.newOrgSvc.FindOrganizationByID(ctx, id) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizationByID")) + } + return o, nil +} + +func (s tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) { + o, err := s.oldOrgSvc.FindOrganization(ctx, filter) + if err != nil { + return o, err + } + n, err := s.newOrgSvc.FindOrganization(ctx, filter) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganization")) + } + return o, nil +} + +func (s tenantService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) { + o, no, err := s.oldOrgSvc.FindOrganizations(ctx, filter, opt...) + if err != nil { + return o, no, err + } + n, _, err := s.newOrgSvc.FindOrganizations(ctx, filter, opt...) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizations")) + } + return o, no, nil +} + +func (s tenantService) CreateOrganization(ctx context.Context, b *influxdb.Organization) error { + return s.oldOrgSvc.CreateOrganization(ctx, b) +} + +func (s tenantService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) { + return s.oldOrgSvc.UpdateOrganization(ctx, id, upd) +} + +func (s tenantService) DeleteOrganization(ctx context.Context, id influxdb.ID) error { + return s.oldOrgSvc.DeleteOrganization(ctx, id) +} + +func (s tenantService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) { + o, no, err := s.oldURMSvc.FindUserResourceMappings(ctx, filter, opt...) + if err != nil { + return o, no, err + } + n, _, err := s.newURMSvc.FindUserResourceMappings(ctx, filter, opt...) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserResourceMappings")) + } + return o, no, nil +} + +func (s tenantService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error { + return s.oldURMSvc.CreateUserResourceMapping(ctx, m) +} + +func (s tenantService) DeleteUserResourceMapping(ctx context.Context, resourceID, userID influxdb.ID) error { + return s.oldURMSvc.DeleteUserResourceMapping(ctx, resourceID, userID) +} + +func (s tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) { + o, err := s.oldUserSvc.FindUserByID(ctx, id) + if err != nil { + return o, err + } + n, err := s.newUserSvc.FindUserByID(ctx, id) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserByID")) + } + return o, nil +} + +func (s tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) { + o, err := s.oldUserSvc.FindUser(ctx, filter) + if err != nil { + return o, err + } + n, err := s.newUserSvc.FindUser(ctx, filter) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUser")) + } + return o, nil +} + +func (s tenantService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) { + o, no, err := s.oldUserSvc.FindUsers(ctx, filter, opt...) + if err != nil { + return o, no, err + } + n, _, err := s.newUserSvc.FindUsers(ctx, filter, opt...) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } else if diff := cmp.Diff(o, n); diff != "" { + s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUsers")) + } + return o, no, nil +} + +func (s tenantService) CreateUser(ctx context.Context, u *influxdb.User) error { + return s.oldUserSvc.CreateUser(ctx, u) +} + +func (s tenantService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) { + return s.oldUserSvc.UpdateUser(ctx, id, upd) +} + +func (s tenantService) DeleteUser(ctx context.Context, id influxdb.ID) error { + return s.oldUserSvc.DeleteUser(ctx, id) +} + +func (s tenantService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error { + return s.oldPwdSvc.SetPassword(ctx, userID, password) +} + +func (s tenantService) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error { + if err := s.oldPwdSvc.ComparePassword(ctx, userID, password); err != nil { + return err + } + err := s.newPwdSvc.ComparePassword(ctx, userID, password) + if err != nil { + s.log.Error("error in new meta store", zap.Error(err)) + } + return nil +} + +func (s tenantService) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error { + return s.oldPwdSvc.CompareAndSetPassword(ctx, userID, old, new) +} diff --git a/tenant/http_client_user.go b/tenant/http_client_user.go index a281dd1f97..ad3a0ace8d 100644 --- a/tenant/http_client_user.go +++ b/tenant/http_client_user.go @@ -126,19 +126,6 @@ func (s *UserClientService) DeleteUser(ctx context.Context, id influxdb.ID) erro Do(ctx) } -// FindUserByID returns a single user by ID. -func (s *UserClientService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) { - var ps influxdb.PermissionSet - err := s.Client. - Get(prefixUsers, id.String(), "permissions"). - DecodeJSON(&ps). - Do(ctx) - if err != nil { - return nil, err - } - return ps, nil -} - // PasswordClientService is an http client to speak to the password service. type PasswordClientService struct { Client *httpc.Client diff --git a/tenant/http_server_user.go b/tenant/http_server_user.go index 85b343938d..ed23806516 100644 --- a/tenant/http_server_user.go +++ b/tenant/http_server_user.go @@ -55,7 +55,6 @@ func NewHTTPUserHandler(log *zap.Logger, userService influxdb.UserService, passw r.Get("/", svr.handleGetUser) r.Patch("/", svr.handlePatchUser) r.Delete("/", svr.handleDeleteUser) - r.Get("/permissions", svr.handleGetPermissions) r.Put("/password", svr.handlePutUserPassword) r.Post("/password", svr.handlePostUserPassword) }) @@ -258,31 +257,6 @@ func (h *UserHandler) handleGetUser(w http.ResponseWriter, r *http.Request) { h.api.Respond(w, r, http.StatusOK, newUserResponse(b)) } -func (h *UserHandler) handleGetPermissions(w http.ResponseWriter, r *http.Request) { - id := chi.URLParam(r, "id") - if id == "" { - err := &influxdb.Error{ - Code: influxdb.EInvalid, - Msg: "url missing id", - } - h.api.Err(w, r, err) - return - } - var i influxdb.ID - if err := i.DecodeFromString(id); err != nil { - h.api.Err(w, r, err) - return - } - - ps, err := h.userSvc.FindPermissionForUser(r.Context(), i) - if err != nil { - h.api.Err(w, r, err) - return - } - - h.api.Respond(w, r, http.StatusOK, ps) -} - type getUserRequest struct { UserID influxdb.ID } diff --git a/tenant/middleware_user_auth.go b/tenant/middleware_user_auth.go index a7800a0aee..a8d75c28ce 100644 --- a/tenant/middleware_user_auth.go +++ b/tenant/middleware_user_auth.go @@ -79,13 +79,6 @@ func (s *AuthedUserService) DeleteUser(ctx context.Context, id influxdb.ID) erro return s.s.DeleteUser(ctx, id) } -func (s *AuthedUserService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) { - if _, _, err := authorizer.AuthorizeReadResource(ctx, influxdb.UsersResourceType, id); err != nil { - return nil, err - } - return s.s.FindPermissionForUser(ctx, id) -} - // AuthedPasswordService is a new authorization middleware for a password service. type AuthedPasswordService struct { s influxdb.PasswordsService diff --git a/tenant/middleware_user_logging.go b/tenant/middleware_user_logging.go index b72994f418..d80ea4831c 100644 --- a/tenant/middleware_user_logging.go +++ b/tenant/middleware_user_logging.go @@ -99,19 +99,6 @@ func (l *UserLogger) DeleteUser(ctx context.Context, id influxdb.ID) (err error) return l.userService.DeleteUser(ctx, id) } -func (l *UserLogger) FindPermissionForUser(ctx context.Context, id influxdb.ID) (ps influxdb.PermissionSet, err error) { - defer func(start time.Time) { - dur := zap.Duration("took", time.Since(start)) - if err != nil { - msg := fmt.Sprintf("failed to delete user with ID %v", id) - l.logger.Debug(msg, zap.Error(err), dur) - return - } - l.logger.Debug("find permission for user", dur) - }(time.Now()) - return l.userService.FindPermissionForUser(ctx, id) -} - type PasswordLogger struct { logger *zap.Logger pwdService influxdb.PasswordsService diff --git a/tenant/middleware_user_metrics.go b/tenant/middleware_user_metrics.go index 11cfce0cae..d4450cb4be 100644 --- a/tenant/middleware_user_metrics.go +++ b/tenant/middleware_user_metrics.go @@ -63,12 +63,6 @@ func (m *UserMetrics) DeleteUser(ctx context.Context, id influxdb.ID) error { return rec(err) } -func (m *UserMetrics) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) { - rec := m.rec.Record("find_permission_for_user") - ps, err := m.userService.FindPermissionForUser(ctx, id) - return ps, rec(err) -} - type PasswordMetrics struct { // RED metrics rec *metric.REDClient diff --git a/tenant/service_urm.go b/tenant/service_urm.go index 93390ab430..f03bdfa5ec 100644 --- a/tenant/service_urm.go +++ b/tenant/service_urm.go @@ -60,19 +60,3 @@ func (s *Service) removeResourceRelations(ctx context.Context, tx kv.Tx, resourc } return nil } - -func permissionFromMapping(mappings []*influxdb.UserResourceMapping) ([]influxdb.Permission, error) { - ps := make([]influxdb.Permission, 0, len(mappings)) - for _, m := range mappings { - p, err := m.ToPermissions() - if err != nil { - return nil, &influxdb.Error{ - Err: err, - } - } - - ps = append(ps, p...) - } - - return ps, nil -} diff --git a/tenant/service_user.go b/tenant/service_user.go index 524cd4f35b..eb3699736d 100644 --- a/tenant/service_user.go +++ b/tenant/service_user.go @@ -131,38 +131,6 @@ func (s *Service) DeleteUser(ctx context.Context, id influxdb.ID) error { return err } -// FindPermissionForUser gets the full set of permission for a specified user id -func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) { - mappings, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Limit: 100}) - if err != nil { - return nil, err - } - - permissions, err := permissionFromMapping(mappings) - if err != nil { - return nil, err - } - - if len(mappings) >= 100 { - // if we got 100 mappings we probably need to pull more pages - // account for paginated results - for i := len(mappings); len(mappings) > 0; i += len(mappings) { - mappings, _, err = s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Offset: i, Limit: 100}) - if err != nil { - return nil, err - } - pms, err := permissionFromMapping(mappings) - if err != nil { - return nil, err - } - permissions = append(permissions, pms...) - } - } - - permissions = append(permissions, influxdb.MePermissions(uid)...) - return permissions, nil -} - // SetPassword overrides the password of a known user. func (s *Service) SetPassword(ctx context.Context, userID influxdb.ID, password string) error { if len(password) < 8 { diff --git a/tenant/service_user_test.go b/tenant/service_user_test.go index a2b6c9e1ce..98ce43369c 100644 --- a/tenant/service_user_test.go +++ b/tenant/service_user_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/google/go-cmp/cmp" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kv" "github.com/influxdata/influxdb/v2/tenant" @@ -88,77 +87,3 @@ func initPasswordsService(s kv.Store, f influxdbtesting.PasswordFields, t *testi } } } - -func TestFindPermissionsFromUser(t *testing.T) { - s, _, err := NewTestInmemStore(t) - if err != nil { - t.Fatal(err) - } - storage := tenant.NewStore(s) - svc := tenant.NewService(storage) - - // createUser - u := &influxdb.User{ - Name: "rockstar", - Status: influxdb.Active, - } - - if err := svc.CreateUser(context.Background(), u); err != nil { - t.Fatal(err) - } - - ctx := context.Background() - - // createSomeURMS - err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ - UserID: u.ID, - UserType: influxdb.Member, - ResourceType: influxdb.OrgsResourceType, - ResourceID: 1, - }) - if err != nil { - t.Fatal(err) - } - - err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{ - UserID: u.ID, - UserType: influxdb.Owner, - ResourceType: influxdb.BucketsResourceType, - ResourceID: 2, - }) - if err != nil { - t.Fatal(err) - } - // pull the permisssions for this user - perms, err := svc.FindPermissionForUser(ctx, u.ID) - if err != nil { - t.Fatal(err) - } - - orgID := influxdb.ID(1) - expected := influxdb.PermissionSet{ - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.AuthorizationsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.BucketsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DashboardsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{ID: &orgID, Type: influxdb.OrgsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SourcesResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TasksResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TelegrafsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.UsersResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.VariablesResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ScraperResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SecretsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.LabelsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ViewsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DocumentsResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationRuleResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationEndpointResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ChecksResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DBRPResourceType}}, - influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}}, - influxdb.Permission{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}}, - } - if !cmp.Equal(perms, expected) { - t.Fatalf("inequal response for find params %+v", cmp.Diff(perms, expected)) - } -} diff --git a/user.go b/user.go index 3e0f5a4323..8f9a61068a 100644 --- a/user.go +++ b/user.go @@ -61,9 +61,6 @@ type UserService interface { // Removes a user by ID. DeleteUser(ctx context.Context, id ID) error - - // FindPermissionForUser - FindPermissionForUser(ctx context.Context, UserID ID) (PermissionSet, error) } // UserUpdate represents updates to a user. diff --git a/user_resource_mapping.go b/user_resource_mapping.go index 795c15b448..a45cc6c741 100644 --- a/user_resource_mapping.go +++ b/user_resource_mapping.go @@ -143,57 +143,28 @@ type UserResourceMappingFilter struct { } func (m *UserResourceMapping) ownerPerms() ([]Permission, error) { + ps := []Permission{} + // TODO(desa): how to grant access to specific resources. + if m.ResourceType == OrgsResourceType { - return OwnerPermissions(m.ResourceID), nil + ps = append(ps, OwnerPermissions(m.ResourceID)...) } - ps := []Permission{ - // TODO: Uncomment these once the URM system is no longer being used for find lookups for: - // Telegraf - // DashBoard - // notification rule - // notification endpoint - // Permission{ - // Action: ReadAction, - // Resource: Resource{ - // Type: m.ResourceType, - // ID: &m.ResourceID, - // }, - // }, - // Permission{ - // Action: WriteAction, - // Resource: Resource{ - // Type: m.ResourceType, - // ID: &m.ResourceID, - // }, - // }, - } return ps, nil } func (m *UserResourceMapping) memberPerms() ([]Permission, error) { + ps := []Permission{} + // TODO(desa): how to grant access to specific resources. + if m.ResourceType == OrgsResourceType { - return MemberPermissions(m.ResourceID), nil + ps = append(ps, MemberPermissions(m.ResourceID)...) } if m.ResourceType == BucketsResourceType { - return []Permission{MemberBucketPermission(m.ResourceID)}, nil + ps = append(ps, MemberBucketPermission(m.ResourceID)) } - ps := []Permission{ - // TODO: Uncomment these once the URM system is no longer being used for find lookups for: - // Telegraf - // DashBoard - // notification rule - // notification endpoint - // Permission{ - // Action: ReadAction, - // Resource: Resource{ - // Type: m.ResourceType, - // ID: &m.ResourceID, - // }, - // }, - } return ps, nil }