revert(tasks): Add new permission lookup pattern for executor (#18869) (#18915)

This reverts commit e9e4d794cf.
pull/18919/head
George 2020-07-10 11:05:07 +01:00 committed by GitHub
parent 8b5723e986
commit b8f5604613
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 476 additions and 418 deletions

View File

@ -75,10 +75,3 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
}
return s.s.DeleteUser(ctx, id)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}

View File

@ -752,7 +752,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
executor, executorMetrics := executor.NewExecutor(
m.log.With(zap.String("service", "task-executor")),
query.QueryServiceBridge{AsyncQueryService: m.queryController},
ts.UserSvc,
authSvc,
combinedTaskService,
combinedTaskService,
)

View File

@ -596,13 +596,6 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
Do(ctx)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}
// PasswordService is an http client to speak to the password service.
type PasswordService struct {
Client *httpc.Client

105
kv/duplicate_read_test.go Normal file
View File

@ -0,0 +1,105 @@
package kv_test
import (
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
s, closeInMem, err := NewTestInmemStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initBucketService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeInMem()
}
}
func TestBoltDuplicateReadBucketService(t *testing.T) {
influxdbtesting.BucketService(initBoltDuplicateReadBucketService, t)
}
func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initOrganizationService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadOrganizationService(t *testing.T) {
influxdbtesting.OrganizationService(initBoltDuplicateReadOrganizationService, t)
}
func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initUserResourceMappingService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadUserResourceMappingService(t *testing.T) {
influxdbtesting.UserResourceMappingService(initBoltDuplicateReadUserResourceMappingService, t)
}
func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initUserService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadUserService(t *testing.T) {
influxdbtesting.UserService(initBoltDuplicateReadUserService, t)
}
func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
s, closeStore, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initPasswordsService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {
closeSvc()
closeStore()
}
}
func TestBoltDuplicateReadPasswordService(t *testing.T) {
influxdbtesting.PasswordsService(initBoltDuplicateReadPasswordsService, t)
}

View File

@ -442,13 +442,6 @@ func (s *Service) deleteUser(ctx context.Context, tx Tx, id influxdb.ID) error {
return nil
}
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}
func (s *Service) deleteUsersAuthorizations(ctx context.Context, tx Tx, id influxdb.ID) error {
authFilter := influxdb.AuthorizationFilter{
UserID: &id,

View File

@ -12,13 +12,12 @@ var _ platform.UserService = (*UserService)(nil)
// also makes it a suitable mock to use wherever an platform.UserService is required.
type UserService struct {
// Methods for a platform.UserService
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
CreateUserFn func(context.Context, *platform.User) error
DeleteUserFn func(context.Context, platform.ID) error
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
FindPermissionForUserFn func(context.Context, platform.ID) (platform.PermissionSet, error)
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
CreateUserFn func(context.Context, *platform.User) error
DeleteUserFn func(context.Context, platform.ID) error
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
}
// NewUserService returns a mock of UserService where its methods will return zero values.
@ -32,7 +31,6 @@ func NewUserService() *UserService {
FindUsersFn: func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) {
return nil, 0, nil
},
FindPermissionForUserFn: func(context.Context, platform.ID) (platform.PermissionSet, error) { return nil, nil },
}
}
@ -65,7 +63,3 @@ func (s *UserService) FindUser(ctx context.Context, filter platform.UserFilter)
func (s *UserService) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) {
return s.UpdateUserFn(ctx, id, upd)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid platform.ID) (platform.PermissionSet, error) {
return s.FindPermissionForUserFn(ctx, uid)
}

View File

@ -25,10 +25,6 @@ const (
var _ scheduler.Executor = (*Executor)(nil)
type PermissionService interface {
FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error)
}
type Promise interface {
ID() influxdb.ID
Cancel(ctx context.Context)
@ -88,7 +84,7 @@ func WithNonSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption {
}
// NewExecutor creates a new task executor
func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
cfg := &executorConfig{
maxWorkers: defaultMaxWorkers,
systemBuildCompiler: NewASTCompiler,
@ -103,7 +99,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t
ts: ts,
tcs: tcs,
qs: qs,
ps: us,
as: as,
currentPromises: sync.Map{},
promiseQueue: make(chan *promise, maxPromises),
@ -130,7 +126,7 @@ type Executor struct {
tcs backend.TaskControlService
qs query.QueryService
ps PermissionService
as influxdb.AuthorizationService
metrics *ExecutorMetrics
@ -280,24 +276,16 @@ func (e *Executor) createPromise(ctx context.Context, run *influxdb.Run) (*promi
if err != nil {
return nil, err
}
perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
if err != nil {
return nil, err
if !t.Authorization.GetUserID().Valid() {
t.Authorization.UserID = t.OwnerID
}
ctx, cancel := context.WithCancel(ctx)
// create promise
p := &promise{
run: run,
task: t,
auth: &influxdb.Authorization{
Status: influxdb.Active,
UserID: t.OwnerID,
ID: influxdb.ID(1),
OrgID: t.OrganizationID,
Permissions: perm,
},
run: run,
task: t,
auth: t.Authorization,
createdAt: time.Now().UTC(),
done: make(chan struct{}),
ctx: ctx,
@ -454,7 +442,7 @@ func (w *worker) executeQuery(p *promise) {
// start
w.start(p)
ctx = icontext.SetAuthorizer(ctx, p.auth)
ctx = icontext.SetAuthorizer(ctx, p.task.Authorization)
buildCompiler := w.systemBuildCompiler
if p.task.Type != influxdb.TaskSystemType {

View File

@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/flux"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
@ -23,7 +22,6 @@ import (
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/backend"
"github.com/influxdata/influxdb/v2/task/backend/executor/mock"
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
@ -64,16 +62,13 @@ func taskExecutorSystem(t *testing.T) tes {
if err := all.Up(ctx, logger, store); err != nil {
t.Fatal(err)
}
ctrl := gomock.NewController(t)
ps := mock.NewMockPermissionService(ctrl)
ps.EXPECT().FindPermissionForUser(gomock.Any(), gomock.Any()).Return(influxdb.PermissionSet{}, nil).AnyTimes()
var (
svc = kv.NewService(logger, store, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
tcs = &taskControlService{TaskControlService: svc}
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, ps, svc, tcs)
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, svc, svc, tcs)
)
return tes{
svc: aqs,

View File

@ -1,127 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: executor.go
// Package mock is a generated GoMock package.
package mock
import (
context "context"
gomock "github.com/golang/mock/gomock"
influxdb "github.com/influxdata/influxdb/v2"
reflect "reflect"
)
// MockPermissionService is a mock of PermissionService interface
type MockPermissionService struct {
ctrl *gomock.Controller
recorder *MockPermissionServiceMockRecorder
}
// MockPermissionServiceMockRecorder is the mock recorder for MockPermissionService
type MockPermissionServiceMockRecorder struct {
mock *MockPermissionService
}
// NewMockPermissionService creates a new mock instance
func NewMockPermissionService(ctrl *gomock.Controller) *MockPermissionService {
mock := &MockPermissionService{ctrl: ctrl}
mock.recorder = &MockPermissionServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockPermissionService) EXPECT() *MockPermissionServiceMockRecorder {
return m.recorder
}
// FindPermissionForUser mocks base method
func (m *MockPermissionService) FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindPermissionForUser", ctx, UserID)
ret0, _ := ret[0].(influxdb.PermissionSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FindPermissionForUser indicates an expected call of FindPermissionForUser
func (mr *MockPermissionServiceMockRecorder) FindPermissionForUser(ctx, UserID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindPermissionForUser", reflect.TypeOf((*MockPermissionService)(nil).FindPermissionForUser), ctx, UserID)
}
// MockPromise is a mock of Promise interface
type MockPromise struct {
ctrl *gomock.Controller
recorder *MockPromiseMockRecorder
}
// MockPromiseMockRecorder is the mock recorder for MockPromise
type MockPromiseMockRecorder struct {
mock *MockPromise
}
// NewMockPromise creates a new mock instance
func NewMockPromise(ctrl *gomock.Controller) *MockPromise {
mock := &MockPromise{ctrl: ctrl}
mock.recorder = &MockPromiseMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockPromise) EXPECT() *MockPromiseMockRecorder {
return m.recorder
}
// ID mocks base method
func (m *MockPromise) ID() influxdb.ID {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ID")
ret0, _ := ret[0].(influxdb.ID)
return ret0
}
// ID indicates an expected call of ID
func (mr *MockPromiseMockRecorder) ID() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockPromise)(nil).ID))
}
// Cancel mocks base method
func (m *MockPromise) Cancel(ctx context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Cancel", ctx)
}
// Cancel indicates an expected call of Cancel
func (mr *MockPromiseMockRecorder) Cancel(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*MockPromise)(nil).Cancel), ctx)
}
// Done mocks base method
func (m *MockPromise) Done() <-chan struct{} {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Done")
ret0, _ := ret[0].(<-chan struct{})
return ret0
}
// Done indicates an expected call of Done
func (mr *MockPromiseMockRecorder) Done() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockPromise)(nil).Done))
}
// Error mocks base method
func (m *MockPromise) Error() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Error")
ret0, _ := ret[0].(error)
return ret0
}
// Error indicates an expected call of Error
func (mr *MockPromiseMockRecorder) Error() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockPromise)(nil).Error))
}

344
tenant/duplicate_reads.go Normal file
View File

@ -0,0 +1,344 @@
package tenant
import (
"context"
"fmt"
"sort"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"go.uber.org/zap"
)
var _ influxdb.TenantService = (*tenantService)(nil)
var bucketCmpOptions = cmp.Options{
cmp.Transformer("Sort", func(in []*influxdb.Bucket) []*influxdb.Bucket {
out := append([]*influxdb.Bucket(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Name > out[j].Name
})
return out
}),
}
// readOnlyStore is a wrapper for kv.Store that ensures that updates are not applied.
type readOnlyStore struct {
kv.Store
}
func (r readOnlyStore) Update(context.Context, func(kv.Tx) error) error {
return nil
}
type tenantService struct {
log *zap.Logger
oldBucketSvc influxdb.BucketService
newBucketSvc influxdb.BucketService
oldOrgSvc influxdb.OrganizationService
newOrgSvc influxdb.OrganizationService
oldURMSvc influxdb.UserResourceMappingService
newURMSvc influxdb.UserResourceMappingService
oldUserSvc influxdb.UserService
newUserSvc influxdb.UserService
oldPwdSvc influxdb.PasswordsService
newPwdSvc influxdb.PasswordsService
}
// NewReadOnlyStore returns a Store that cannot update the underlying kv.Store.
func NewReadOnlyStore(store kv.Store) *Store {
return NewStore(readOnlyStore{Store: store})
}
// NewDuplicateReadTenantService returns a tenant service that duplicates the reads to oldSvc and newSvc.
// The foreseen use case is to compare two service versions, an old one and a new one.
// The resulting influxdb.TenantService:
// - forwards writes to the old service;
// - reads from the old one, if no error is encountered, it reads from the new one;
// - compares the results obtained and logs the difference, if any.
func NewDuplicateReadTenantService(log *zap.Logger, oldSvc, newSvc influxdb.TenantService) influxdb.TenantService {
return tenantService{
log: log,
oldBucketSvc: oldSvc,
oldOrgSvc: oldSvc,
oldURMSvc: oldSvc,
oldUserSvc: oldSvc,
oldPwdSvc: oldSvc,
newBucketSvc: newSvc,
newOrgSvc: newSvc,
newURMSvc: newSvc,
newUserSvc: newSvc,
newPwdSvc: newSvc,
}
}
// NewDuplicateReadBucketService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadBucketService(log *zap.Logger, oldSvc, newSvc influxdb.BucketService) influxdb.BucketService {
return tenantService{
log: log,
oldBucketSvc: oldSvc,
newBucketSvc: newSvc,
}
}
// NewDuplicateReadOrganizationService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadOrganizationService(log *zap.Logger, oldSvc, newSvc influxdb.OrganizationService) influxdb.OrganizationService {
return tenantService{
log: log,
oldOrgSvc: oldSvc,
newOrgSvc: newSvc,
}
}
// NewDuplicateReadUserResourceMappingService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadUserResourceMappingService(log *zap.Logger, oldSvc, newSvc influxdb.UserResourceMappingService) influxdb.UserResourceMappingService {
return tenantService{
log: log,
oldURMSvc: oldSvc,
newURMSvc: newSvc,
}
}
// NewDuplicateReadUserService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadUserService(log *zap.Logger, oldSvc, newSvc influxdb.UserService) influxdb.UserService {
return tenantService{
log: log,
oldUserSvc: oldSvc,
newUserSvc: newSvc,
}
}
// NewDuplicateReadPasswordsService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadPasswordsService(log *zap.Logger, oldSvc, newSvc influxdb.PasswordsService) influxdb.PasswordsService {
return tenantService{
log: log,
oldPwdSvc: oldSvc,
newPwdSvc: newSvc,
}
}
func (s tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucketByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucketByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByID"))
}
return o, nil
}
func (s tenantService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucketByName(ctx, orgID, name)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucketByName(ctx, orgID, name)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByName"))
}
return o, nil
}
func (s tenantService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucket(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucket(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucket"))
}
return o, nil
}
func (s tenantService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
o, no, err := s.oldBucketSvc.FindBuckets(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newBucketSvc.FindBuckets(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n, bucketCmpOptions); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBuckets"))
}
return o, no, nil
}
func (s tenantService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
return s.oldBucketSvc.CreateBucket(ctx, b)
}
func (s tenantService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
return s.oldBucketSvc.UpdateBucket(ctx, id, upd)
}
func (s tenantService) DeleteBucket(ctx context.Context, id influxdb.ID) error {
return s.oldBucketSvc.DeleteBucket(ctx, id)
}
func (s tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
o, err := s.oldOrgSvc.FindOrganizationByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newOrgSvc.FindOrganizationByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizationByID"))
}
return o, nil
}
func (s tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
o, err := s.oldOrgSvc.FindOrganization(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newOrgSvc.FindOrganization(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganization"))
}
return o, nil
}
func (s tenantService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
o, no, err := s.oldOrgSvc.FindOrganizations(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newOrgSvc.FindOrganizations(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizations"))
}
return o, no, nil
}
func (s tenantService) CreateOrganization(ctx context.Context, b *influxdb.Organization) error {
return s.oldOrgSvc.CreateOrganization(ctx, b)
}
func (s tenantService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
return s.oldOrgSvc.UpdateOrganization(ctx, id, upd)
}
func (s tenantService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
return s.oldOrgSvc.DeleteOrganization(ctx, id)
}
func (s tenantService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
o, no, err := s.oldURMSvc.FindUserResourceMappings(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newURMSvc.FindUserResourceMappings(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserResourceMappings"))
}
return o, no, nil
}
func (s tenantService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
return s.oldURMSvc.CreateUserResourceMapping(ctx, m)
}
func (s tenantService) DeleteUserResourceMapping(ctx context.Context, resourceID, userID influxdb.ID) error {
return s.oldURMSvc.DeleteUserResourceMapping(ctx, resourceID, userID)
}
func (s tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
o, err := s.oldUserSvc.FindUserByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newUserSvc.FindUserByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserByID"))
}
return o, nil
}
func (s tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) {
o, err := s.oldUserSvc.FindUser(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newUserSvc.FindUser(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUser"))
}
return o, nil
}
func (s tenantService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) {
o, no, err := s.oldUserSvc.FindUsers(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newUserSvc.FindUsers(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUsers"))
}
return o, no, nil
}
func (s tenantService) CreateUser(ctx context.Context, u *influxdb.User) error {
return s.oldUserSvc.CreateUser(ctx, u)
}
func (s tenantService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) {
return s.oldUserSvc.UpdateUser(ctx, id, upd)
}
func (s tenantService) DeleteUser(ctx context.Context, id influxdb.ID) error {
return s.oldUserSvc.DeleteUser(ctx, id)
}
func (s tenantService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
return s.oldPwdSvc.SetPassword(ctx, userID, password)
}
func (s tenantService) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error {
if err := s.oldPwdSvc.ComparePassword(ctx, userID, password); err != nil {
return err
}
err := s.newPwdSvc.ComparePassword(ctx, userID, password)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
}
return nil
}
func (s tenantService) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error {
return s.oldPwdSvc.CompareAndSetPassword(ctx, userID, old, new)
}

View File

@ -126,19 +126,6 @@ func (s *UserClientService) DeleteUser(ctx context.Context, id influxdb.ID) erro
Do(ctx)
}
// FindUserByID returns a single user by ID.
func (s *UserClientService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
var ps influxdb.PermissionSet
err := s.Client.
Get(prefixUsers, id.String(), "permissions").
DecodeJSON(&ps).
Do(ctx)
if err != nil {
return nil, err
}
return ps, nil
}
// PasswordClientService is an http client to speak to the password service.
type PasswordClientService struct {
Client *httpc.Client

View File

@ -55,7 +55,6 @@ func NewHTTPUserHandler(log *zap.Logger, userService influxdb.UserService, passw
r.Get("/", svr.handleGetUser)
r.Patch("/", svr.handlePatchUser)
r.Delete("/", svr.handleDeleteUser)
r.Get("/permissions", svr.handleGetPermissions)
r.Put("/password", svr.handlePutUserPassword)
r.Post("/password", svr.handlePostUserPassword)
})
@ -258,31 +257,6 @@ func (h *UserHandler) handleGetUser(w http.ResponseWriter, r *http.Request) {
h.api.Respond(w, r, http.StatusOK, newUserResponse(b))
}
func (h *UserHandler) handleGetPermissions(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if id == "" {
err := &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
h.api.Err(w, r, err)
return
}
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
h.api.Err(w, r, err)
return
}
ps, err := h.userSvc.FindPermissionForUser(r.Context(), i)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, ps)
}
type getUserRequest struct {
UserID influxdb.ID
}

View File

@ -79,13 +79,6 @@ func (s *AuthedUserService) DeleteUser(ctx context.Context, id influxdb.ID) erro
return s.s.DeleteUser(ctx, id)
}
func (s *AuthedUserService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
if _, _, err := authorizer.AuthorizeReadResource(ctx, influxdb.UsersResourceType, id); err != nil {
return nil, err
}
return s.s.FindPermissionForUser(ctx, id)
}
// AuthedPasswordService is a new authorization middleware for a password service.
type AuthedPasswordService struct {
s influxdb.PasswordsService

View File

@ -99,19 +99,6 @@ func (l *UserLogger) DeleteUser(ctx context.Context, id influxdb.ID) (err error)
return l.userService.DeleteUser(ctx, id)
}
func (l *UserLogger) FindPermissionForUser(ctx context.Context, id influxdb.ID) (ps influxdb.PermissionSet, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
msg := fmt.Sprintf("failed to delete user with ID %v", id)
l.logger.Debug(msg, zap.Error(err), dur)
return
}
l.logger.Debug("find permission for user", dur)
}(time.Now())
return l.userService.FindPermissionForUser(ctx, id)
}
type PasswordLogger struct {
logger *zap.Logger
pwdService influxdb.PasswordsService

View File

@ -63,12 +63,6 @@ func (m *UserMetrics) DeleteUser(ctx context.Context, id influxdb.ID) error {
return rec(err)
}
func (m *UserMetrics) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
rec := m.rec.Record("find_permission_for_user")
ps, err := m.userService.FindPermissionForUser(ctx, id)
return ps, rec(err)
}
type PasswordMetrics struct {
// RED metrics
rec *metric.REDClient

View File

@ -60,19 +60,3 @@ func (s *Service) removeResourceRelations(ctx context.Context, tx kv.Tx, resourc
}
return nil
}
func permissionFromMapping(mappings []*influxdb.UserResourceMapping) ([]influxdb.Permission, error) {
ps := make([]influxdb.Permission, 0, len(mappings))
for _, m := range mappings {
p, err := m.ToPermissions()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps = append(ps, p...)
}
return ps, nil
}

View File

@ -131,38 +131,6 @@ func (s *Service) DeleteUser(ctx context.Context, id influxdb.ID) error {
return err
}
// FindPermissionForUser gets the full set of permission for a specified user id
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
mappings, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Limit: 100})
if err != nil {
return nil, err
}
permissions, err := permissionFromMapping(mappings)
if err != nil {
return nil, err
}
if len(mappings) >= 100 {
// if we got 100 mappings we probably need to pull more pages
// account for paginated results
for i := len(mappings); len(mappings) > 0; i += len(mappings) {
mappings, _, err = s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Offset: i, Limit: 100})
if err != nil {
return nil, err
}
pms, err := permissionFromMapping(mappings)
if err != nil {
return nil, err
}
permissions = append(permissions, pms...)
}
}
permissions = append(permissions, influxdb.MePermissions(uid)...)
return permissions, nil
}
// SetPassword overrides the password of a known user.
func (s *Service) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
if len(password) < 8 {

View File

@ -4,7 +4,6 @@ import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/tenant"
@ -88,77 +87,3 @@ func initPasswordsService(s kv.Store, f influxdbtesting.PasswordFields, t *testi
}
}
}
func TestFindPermissionsFromUser(t *testing.T) {
s, _, err := NewTestInmemStore(t)
if err != nil {
t.Fatal(err)
}
storage := tenant.NewStore(s)
svc := tenant.NewService(storage)
// createUser
u := &influxdb.User{
Name: "rockstar",
Status: influxdb.Active,
}
if err := svc.CreateUser(context.Background(), u); err != nil {
t.Fatal(err)
}
ctx := context.Background()
// createSomeURMS
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
UserID: u.ID,
UserType: influxdb.Member,
ResourceType: influxdb.OrgsResourceType,
ResourceID: 1,
})
if err != nil {
t.Fatal(err)
}
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
UserID: u.ID,
UserType: influxdb.Owner,
ResourceType: influxdb.BucketsResourceType,
ResourceID: 2,
})
if err != nil {
t.Fatal(err)
}
// pull the permisssions for this user
perms, err := svc.FindPermissionForUser(ctx, u.ID)
if err != nil {
t.Fatal(err)
}
orgID := influxdb.ID(1)
expected := influxdb.PermissionSet{
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.AuthorizationsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.BucketsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DashboardsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{ID: &orgID, Type: influxdb.OrgsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SourcesResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TasksResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TelegrafsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.UsersResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.VariablesResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ScraperResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SecretsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.LabelsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ViewsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DocumentsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationRuleResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationEndpointResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ChecksResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DBRPResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
influxdb.Permission{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
}
if !cmp.Equal(perms, expected) {
t.Fatalf("inequal response for find params %+v", cmp.Diff(perms, expected))
}
}

View File

@ -61,9 +61,6 @@ type UserService interface {
// Removes a user by ID.
DeleteUser(ctx context.Context, id ID) error
// FindPermissionForUser
FindPermissionForUser(ctx context.Context, UserID ID) (PermissionSet, error)
}
// UserUpdate represents updates to a user.

View File

@ -143,57 +143,28 @@ type UserResourceMappingFilter struct {
}
func (m *UserResourceMapping) ownerPerms() ([]Permission, error) {
ps := []Permission{}
// TODO(desa): how to grant access to specific resources.
if m.ResourceType == OrgsResourceType {
return OwnerPermissions(m.ResourceID), nil
ps = append(ps, OwnerPermissions(m.ResourceID)...)
}
ps := []Permission{
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
// Telegraf
// DashBoard
// notification rule
// notification endpoint
// Permission{
// Action: ReadAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
// Permission{
// Action: WriteAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
}
return ps, nil
}
func (m *UserResourceMapping) memberPerms() ([]Permission, error) {
ps := []Permission{}
// TODO(desa): how to grant access to specific resources.
if m.ResourceType == OrgsResourceType {
return MemberPermissions(m.ResourceID), nil
ps = append(ps, MemberPermissions(m.ResourceID)...)
}
if m.ResourceType == BucketsResourceType {
return []Permission{MemberBucketPermission(m.ResourceID)}, nil
ps = append(ps, MemberBucketPermission(m.ResourceID))
}
ps := []Permission{
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
// Telegraf
// DashBoard
// notification rule
// notification endpoint
// Permission{
// Action: ReadAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
}
return ps, nil
}