feat(task): Add new permission lookup pattern for executor ()

We can now use the user service to populate task owners permissions.
This should improve the task lookup time and decouple the task system
from the URM system. In the future we will have the ability to better isolate
tenant pieces from the rest of the service.
pull/18910/head
Lyon Hill 2020-07-09 14:06:46 -06:00 committed by GitHub
parent 76179a203e
commit e9e4d794cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 418 additions and 476 deletions

View File

@ -75,3 +75,10 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
}
return s.s.DeleteUser(ctx, id)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}

View File

@ -752,7 +752,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
executor, executorMetrics := executor.NewExecutor(
m.log.With(zap.String("service", "task-executor")),
query.QueryServiceBridge{AsyncQueryService: m.queryController},
authSvc,
ts.UserSvc,
combinedTaskService,
combinedTaskService,
)

View File

@ -596,6 +596,13 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
Do(ctx)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}
// PasswordService is an http client to speak to the password service.
type PasswordService struct {
Client *httpc.Client

View File

@ -1,105 +0,0 @@
package kv_test
import (
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
s, closeInMem, err := NewTestInmemStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initBucketService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeInMem()
}
}
func TestBoltDuplicateReadBucketService(t *testing.T) {
influxdbtesting.BucketService(initBoltDuplicateReadBucketService, t)
}
func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initOrganizationService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadOrganizationService(t *testing.T) {
influxdbtesting.OrganizationService(initBoltDuplicateReadOrganizationService, t)
}
func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initUserResourceMappingService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadUserResourceMappingService(t *testing.T) {
influxdbtesting.UserResourceMappingService(initBoltDuplicateReadUserResourceMappingService, t)
}
func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
s, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initUserService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
closeSvc()
closeBolt()
}
}
func TestBoltDuplicateReadUserService(t *testing.T) {
influxdbtesting.UserService(initBoltDuplicateReadUserService, t)
}
func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
s, closeStore, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initPasswordsService(s, f, t)
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {
closeSvc()
closeStore()
}
}
func TestBoltDuplicateReadPasswordService(t *testing.T) {
influxdbtesting.PasswordsService(initBoltDuplicateReadPasswordsService, t)
}

View File

@ -442,6 +442,13 @@ func (s *Service) deleteUser(ctx context.Context, tx Tx, id influxdb.ID) error {
return nil
}
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
return nil, &influxdb.Error{
Code: influxdb.EInternal,
Msg: "not implemented",
}
}
func (s *Service) deleteUsersAuthorizations(ctx context.Context, tx Tx, id influxdb.ID) error {
authFilter := influxdb.AuthorizationFilter{
UserID: &id,

View File

@ -12,12 +12,13 @@ var _ platform.UserService = (*UserService)(nil)
// also makes it a suitable mock to use wherever an platform.UserService is required.
type UserService struct {
// Methods for a platform.UserService
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
CreateUserFn func(context.Context, *platform.User) error
DeleteUserFn func(context.Context, platform.ID) error
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
CreateUserFn func(context.Context, *platform.User) error
DeleteUserFn func(context.Context, platform.ID) error
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
FindPermissionForUserFn func(context.Context, platform.ID) (platform.PermissionSet, error)
}
// NewUserService returns a mock of UserService where its methods will return zero values.
@ -31,6 +32,7 @@ func NewUserService() *UserService {
FindUsersFn: func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) {
return nil, 0, nil
},
FindPermissionForUserFn: func(context.Context, platform.ID) (platform.PermissionSet, error) { return nil, nil },
}
}
@ -63,3 +65,7 @@ func (s *UserService) FindUser(ctx context.Context, filter platform.UserFilter)
func (s *UserService) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) {
return s.UpdateUserFn(ctx, id, upd)
}
func (s *UserService) FindPermissionForUser(ctx context.Context, uid platform.ID) (platform.PermissionSet, error) {
return s.FindPermissionForUserFn(ctx, uid)
}

View File

@ -25,6 +25,10 @@ const (
var _ scheduler.Executor = (*Executor)(nil)
type PermissionService interface {
FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error)
}
type Promise interface {
ID() influxdb.ID
Cancel(ctx context.Context)
@ -84,7 +88,7 @@ func WithNonSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption {
}
// NewExecutor creates a new task executor
func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
cfg := &executorConfig{
maxWorkers: defaultMaxWorkers,
systemBuildCompiler: NewASTCompiler,
@ -99,7 +103,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.Authorizati
ts: ts,
tcs: tcs,
qs: qs,
as: as,
ps: us,
currentPromises: sync.Map{},
promiseQueue: make(chan *promise, maxPromises),
@ -126,7 +130,7 @@ type Executor struct {
tcs backend.TaskControlService
qs query.QueryService
as influxdb.AuthorizationService
ps PermissionService
metrics *ExecutorMetrics
@ -276,16 +280,24 @@ func (e *Executor) createPromise(ctx context.Context, run *influxdb.Run) (*promi
if err != nil {
return nil, err
}
if !t.Authorization.GetUserID().Valid() {
t.Authorization.UserID = t.OwnerID
perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
// create promise
p := &promise{
run: run,
task: t,
auth: t.Authorization,
run: run,
task: t,
auth: &influxdb.Authorization{
Status: influxdb.Active,
UserID: t.OwnerID,
ID: influxdb.ID(1),
OrgID: t.OrganizationID,
Permissions: perm,
},
createdAt: time.Now().UTC(),
done: make(chan struct{}),
ctx: ctx,
@ -442,7 +454,7 @@ func (w *worker) executeQuery(p *promise) {
// start
w.start(p)
ctx = icontext.SetAuthorizer(ctx, p.task.Authorization)
ctx = icontext.SetAuthorizer(ctx, p.auth)
buildCompiler := w.systemBuildCompiler
if p.task.Type != influxdb.TaskSystemType {

View File

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/influxdata/flux"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
@ -22,6 +23,7 @@ import (
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"github.com/influxdata/influxdb/v2/task/backend"
"github.com/influxdata/influxdb/v2/task/backend/executor/mock"
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
@ -62,13 +64,16 @@ func taskExecutorSystem(t *testing.T) tes {
if err := all.Up(ctx, logger, store); err != nil {
t.Fatal(err)
}
ctrl := gomock.NewController(t)
ps := mock.NewMockPermissionService(ctrl)
ps.EXPECT().FindPermissionForUser(gomock.Any(), gomock.Any()).Return(influxdb.PermissionSet{}, nil).AnyTimes()
var (
svc = kv.NewService(logger, store, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
tcs = &taskControlService{TaskControlService: svc}
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, svc, svc, tcs)
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, ps, svc, tcs)
)
return tes{
svc: aqs,

View File

@ -0,0 +1,127 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: executor.go
// Package mock is a generated GoMock package.
package mock
import (
context "context"
gomock "github.com/golang/mock/gomock"
influxdb "github.com/influxdata/influxdb/v2"
reflect "reflect"
)
// MockPermissionService is a mock of PermissionService interface
type MockPermissionService struct {
ctrl *gomock.Controller
recorder *MockPermissionServiceMockRecorder
}
// MockPermissionServiceMockRecorder is the mock recorder for MockPermissionService
type MockPermissionServiceMockRecorder struct {
mock *MockPermissionService
}
// NewMockPermissionService creates a new mock instance
func NewMockPermissionService(ctrl *gomock.Controller) *MockPermissionService {
mock := &MockPermissionService{ctrl: ctrl}
mock.recorder = &MockPermissionServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockPermissionService) EXPECT() *MockPermissionServiceMockRecorder {
return m.recorder
}
// FindPermissionForUser mocks base method
func (m *MockPermissionService) FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindPermissionForUser", ctx, UserID)
ret0, _ := ret[0].(influxdb.PermissionSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FindPermissionForUser indicates an expected call of FindPermissionForUser
func (mr *MockPermissionServiceMockRecorder) FindPermissionForUser(ctx, UserID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindPermissionForUser", reflect.TypeOf((*MockPermissionService)(nil).FindPermissionForUser), ctx, UserID)
}
// MockPromise is a mock of Promise interface
type MockPromise struct {
ctrl *gomock.Controller
recorder *MockPromiseMockRecorder
}
// MockPromiseMockRecorder is the mock recorder for MockPromise
type MockPromiseMockRecorder struct {
mock *MockPromise
}
// NewMockPromise creates a new mock instance
func NewMockPromise(ctrl *gomock.Controller) *MockPromise {
mock := &MockPromise{ctrl: ctrl}
mock.recorder = &MockPromiseMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockPromise) EXPECT() *MockPromiseMockRecorder {
return m.recorder
}
// ID mocks base method
func (m *MockPromise) ID() influxdb.ID {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ID")
ret0, _ := ret[0].(influxdb.ID)
return ret0
}
// ID indicates an expected call of ID
func (mr *MockPromiseMockRecorder) ID() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockPromise)(nil).ID))
}
// Cancel mocks base method
func (m *MockPromise) Cancel(ctx context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Cancel", ctx)
}
// Cancel indicates an expected call of Cancel
func (mr *MockPromiseMockRecorder) Cancel(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*MockPromise)(nil).Cancel), ctx)
}
// Done mocks base method
func (m *MockPromise) Done() <-chan struct{} {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Done")
ret0, _ := ret[0].(<-chan struct{})
return ret0
}
// Done indicates an expected call of Done
func (mr *MockPromiseMockRecorder) Done() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockPromise)(nil).Done))
}
// Error mocks base method
func (m *MockPromise) Error() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Error")
ret0, _ := ret[0].(error)
return ret0
}
// Error indicates an expected call of Error
func (mr *MockPromiseMockRecorder) Error() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockPromise)(nil).Error))
}

View File

@ -1,344 +0,0 @@
package tenant
import (
"context"
"fmt"
"sort"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"go.uber.org/zap"
)
var _ influxdb.TenantService = (*tenantService)(nil)
var bucketCmpOptions = cmp.Options{
cmp.Transformer("Sort", func(in []*influxdb.Bucket) []*influxdb.Bucket {
out := append([]*influxdb.Bucket(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Name > out[j].Name
})
return out
}),
}
// readOnlyStore is a wrapper for kv.Store that ensures that updates are not applied.
type readOnlyStore struct {
kv.Store
}
func (r readOnlyStore) Update(context.Context, func(kv.Tx) error) error {
return nil
}
type tenantService struct {
log *zap.Logger
oldBucketSvc influxdb.BucketService
newBucketSvc influxdb.BucketService
oldOrgSvc influxdb.OrganizationService
newOrgSvc influxdb.OrganizationService
oldURMSvc influxdb.UserResourceMappingService
newURMSvc influxdb.UserResourceMappingService
oldUserSvc influxdb.UserService
newUserSvc influxdb.UserService
oldPwdSvc influxdb.PasswordsService
newPwdSvc influxdb.PasswordsService
}
// NewReadOnlyStore returns a Store that cannot update the underlying kv.Store.
func NewReadOnlyStore(store kv.Store) *Store {
return NewStore(readOnlyStore{Store: store})
}
// NewDuplicateReadTenantService returns a tenant service that duplicates the reads to oldSvc and newSvc.
// The foreseen use case is to compare two service versions, an old one and a new one.
// The resulting influxdb.TenantService:
// - forwards writes to the old service;
// - reads from the old one, if no error is encountered, it reads from the new one;
// - compares the results obtained and logs the difference, if any.
func NewDuplicateReadTenantService(log *zap.Logger, oldSvc, newSvc influxdb.TenantService) influxdb.TenantService {
return tenantService{
log: log,
oldBucketSvc: oldSvc,
oldOrgSvc: oldSvc,
oldURMSvc: oldSvc,
oldUserSvc: oldSvc,
oldPwdSvc: oldSvc,
newBucketSvc: newSvc,
newOrgSvc: newSvc,
newURMSvc: newSvc,
newUserSvc: newSvc,
newPwdSvc: newSvc,
}
}
// NewDuplicateReadBucketService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadBucketService(log *zap.Logger, oldSvc, newSvc influxdb.BucketService) influxdb.BucketService {
return tenantService{
log: log,
oldBucketSvc: oldSvc,
newBucketSvc: newSvc,
}
}
// NewDuplicateReadOrganizationService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadOrganizationService(log *zap.Logger, oldSvc, newSvc influxdb.OrganizationService) influxdb.OrganizationService {
return tenantService{
log: log,
oldOrgSvc: oldSvc,
newOrgSvc: newSvc,
}
}
// NewDuplicateReadUserResourceMappingService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadUserResourceMappingService(log *zap.Logger, oldSvc, newSvc influxdb.UserResourceMappingService) influxdb.UserResourceMappingService {
return tenantService{
log: log,
oldURMSvc: oldSvc,
newURMSvc: newSvc,
}
}
// NewDuplicateReadUserService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadUserService(log *zap.Logger, oldSvc, newSvc influxdb.UserService) influxdb.UserService {
return tenantService{
log: log,
oldUserSvc: oldSvc,
newUserSvc: newSvc,
}
}
// NewDuplicateReadPasswordsService returns a service that duplicates the reads to oldSvc and newSvc.
func NewDuplicateReadPasswordsService(log *zap.Logger, oldSvc, newSvc influxdb.PasswordsService) influxdb.PasswordsService {
return tenantService{
log: log,
oldPwdSvc: oldSvc,
newPwdSvc: newSvc,
}
}
func (s tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucketByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucketByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByID"))
}
return o, nil
}
func (s tenantService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucketByName(ctx, orgID, name)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucketByName(ctx, orgID, name)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByName"))
}
return o, nil
}
func (s tenantService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
o, err := s.oldBucketSvc.FindBucket(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newBucketSvc.FindBucket(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucket"))
}
return o, nil
}
func (s tenantService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
o, no, err := s.oldBucketSvc.FindBuckets(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newBucketSvc.FindBuckets(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n, bucketCmpOptions); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBuckets"))
}
return o, no, nil
}
func (s tenantService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
return s.oldBucketSvc.CreateBucket(ctx, b)
}
func (s tenantService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
return s.oldBucketSvc.UpdateBucket(ctx, id, upd)
}
func (s tenantService) DeleteBucket(ctx context.Context, id influxdb.ID) error {
return s.oldBucketSvc.DeleteBucket(ctx, id)
}
func (s tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
o, err := s.oldOrgSvc.FindOrganizationByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newOrgSvc.FindOrganizationByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizationByID"))
}
return o, nil
}
func (s tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
o, err := s.oldOrgSvc.FindOrganization(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newOrgSvc.FindOrganization(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganization"))
}
return o, nil
}
func (s tenantService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
o, no, err := s.oldOrgSvc.FindOrganizations(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newOrgSvc.FindOrganizations(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizations"))
}
return o, no, nil
}
func (s tenantService) CreateOrganization(ctx context.Context, b *influxdb.Organization) error {
return s.oldOrgSvc.CreateOrganization(ctx, b)
}
func (s tenantService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
return s.oldOrgSvc.UpdateOrganization(ctx, id, upd)
}
func (s tenantService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
return s.oldOrgSvc.DeleteOrganization(ctx, id)
}
func (s tenantService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
o, no, err := s.oldURMSvc.FindUserResourceMappings(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newURMSvc.FindUserResourceMappings(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserResourceMappings"))
}
return o, no, nil
}
func (s tenantService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
return s.oldURMSvc.CreateUserResourceMapping(ctx, m)
}
func (s tenantService) DeleteUserResourceMapping(ctx context.Context, resourceID, userID influxdb.ID) error {
return s.oldURMSvc.DeleteUserResourceMapping(ctx, resourceID, userID)
}
func (s tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
o, err := s.oldUserSvc.FindUserByID(ctx, id)
if err != nil {
return o, err
}
n, err := s.newUserSvc.FindUserByID(ctx, id)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserByID"))
}
return o, nil
}
func (s tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) {
o, err := s.oldUserSvc.FindUser(ctx, filter)
if err != nil {
return o, err
}
n, err := s.newUserSvc.FindUser(ctx, filter)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUser"))
}
return o, nil
}
func (s tenantService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) {
o, no, err := s.oldUserSvc.FindUsers(ctx, filter, opt...)
if err != nil {
return o, no, err
}
n, _, err := s.newUserSvc.FindUsers(ctx, filter, opt...)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
} else if diff := cmp.Diff(o, n); diff != "" {
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUsers"))
}
return o, no, nil
}
func (s tenantService) CreateUser(ctx context.Context, u *influxdb.User) error {
return s.oldUserSvc.CreateUser(ctx, u)
}
func (s tenantService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) {
return s.oldUserSvc.UpdateUser(ctx, id, upd)
}
func (s tenantService) DeleteUser(ctx context.Context, id influxdb.ID) error {
return s.oldUserSvc.DeleteUser(ctx, id)
}
func (s tenantService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
return s.oldPwdSvc.SetPassword(ctx, userID, password)
}
func (s tenantService) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error {
if err := s.oldPwdSvc.ComparePassword(ctx, userID, password); err != nil {
return err
}
err := s.newPwdSvc.ComparePassword(ctx, userID, password)
if err != nil {
s.log.Error("error in new meta store", zap.Error(err))
}
return nil
}
func (s tenantService) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error {
return s.oldPwdSvc.CompareAndSetPassword(ctx, userID, old, new)
}

View File

@ -126,6 +126,19 @@ func (s *UserClientService) DeleteUser(ctx context.Context, id influxdb.ID) erro
Do(ctx)
}
// FindUserByID returns a single user by ID.
func (s *UserClientService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
var ps influxdb.PermissionSet
err := s.Client.
Get(prefixUsers, id.String(), "permissions").
DecodeJSON(&ps).
Do(ctx)
if err != nil {
return nil, err
}
return ps, nil
}
// PasswordClientService is an http client to speak to the password service.
type PasswordClientService struct {
Client *httpc.Client

View File

@ -55,6 +55,7 @@ func NewHTTPUserHandler(log *zap.Logger, userService influxdb.UserService, passw
r.Get("/", svr.handleGetUser)
r.Patch("/", svr.handlePatchUser)
r.Delete("/", svr.handleDeleteUser)
r.Get("/permissions", svr.handleGetPermissions)
r.Put("/password", svr.handlePutUserPassword)
r.Post("/password", svr.handlePostUserPassword)
})
@ -257,6 +258,31 @@ func (h *UserHandler) handleGetUser(w http.ResponseWriter, r *http.Request) {
h.api.Respond(w, r, http.StatusOK, newUserResponse(b))
}
func (h *UserHandler) handleGetPermissions(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if id == "" {
err := &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "url missing id",
}
h.api.Err(w, r, err)
return
}
var i influxdb.ID
if err := i.DecodeFromString(id); err != nil {
h.api.Err(w, r, err)
return
}
ps, err := h.userSvc.FindPermissionForUser(r.Context(), i)
if err != nil {
h.api.Err(w, r, err)
return
}
h.api.Respond(w, r, http.StatusOK, ps)
}
type getUserRequest struct {
UserID influxdb.ID
}

View File

@ -79,6 +79,13 @@ func (s *AuthedUserService) DeleteUser(ctx context.Context, id influxdb.ID) erro
return s.s.DeleteUser(ctx, id)
}
func (s *AuthedUserService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
if _, _, err := authorizer.AuthorizeReadResource(ctx, influxdb.UsersResourceType, id); err != nil {
return nil, err
}
return s.s.FindPermissionForUser(ctx, id)
}
// AuthedPasswordService is a new authorization middleware for a password service.
type AuthedPasswordService struct {
s influxdb.PasswordsService

View File

@ -99,6 +99,19 @@ func (l *UserLogger) DeleteUser(ctx context.Context, id influxdb.ID) (err error)
return l.userService.DeleteUser(ctx, id)
}
func (l *UserLogger) FindPermissionForUser(ctx context.Context, id influxdb.ID) (ps influxdb.PermissionSet, err error) {
defer func(start time.Time) {
dur := zap.Duration("took", time.Since(start))
if err != nil {
msg := fmt.Sprintf("failed to delete user with ID %v", id)
l.logger.Debug(msg, zap.Error(err), dur)
return
}
l.logger.Debug("find permission for user", dur)
}(time.Now())
return l.userService.FindPermissionForUser(ctx, id)
}
type PasswordLogger struct {
logger *zap.Logger
pwdService influxdb.PasswordsService

View File

@ -63,6 +63,12 @@ func (m *UserMetrics) DeleteUser(ctx context.Context, id influxdb.ID) error {
return rec(err)
}
func (m *UserMetrics) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
rec := m.rec.Record("find_permission_for_user")
ps, err := m.userService.FindPermissionForUser(ctx, id)
return ps, rec(err)
}
type PasswordMetrics struct {
// RED metrics
rec *metric.REDClient

View File

@ -60,3 +60,19 @@ func (s *Service) removeResourceRelations(ctx context.Context, tx kv.Tx, resourc
}
return nil
}
func permissionFromMapping(mappings []*influxdb.UserResourceMapping) ([]influxdb.Permission, error) {
ps := make([]influxdb.Permission, 0, len(mappings))
for _, m := range mappings {
p, err := m.ToPermissions()
if err != nil {
return nil, &influxdb.Error{
Err: err,
}
}
ps = append(ps, p...)
}
return ps, nil
}

View File

@ -131,6 +131,38 @@ func (s *Service) DeleteUser(ctx context.Context, id influxdb.ID) error {
return err
}
// FindPermissionForUser gets the full set of permission for a specified user id
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
mappings, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Limit: 100})
if err != nil {
return nil, err
}
permissions, err := permissionFromMapping(mappings)
if err != nil {
return nil, err
}
if len(mappings) >= 100 {
// if we got 100 mappings we probably need to pull more pages
// account for paginated results
for i := len(mappings); len(mappings) > 0; i += len(mappings) {
mappings, _, err = s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Offset: i, Limit: 100})
if err != nil {
return nil, err
}
pms, err := permissionFromMapping(mappings)
if err != nil {
return nil, err
}
permissions = append(permissions, pms...)
}
}
permissions = append(permissions, influxdb.MePermissions(uid)...)
return permissions, nil
}
// SetPassword overrides the password of a known user.
func (s *Service) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
if len(password) < 8 {

View File

@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/tenant"
@ -87,3 +88,77 @@ func initPasswordsService(s kv.Store, f influxdbtesting.PasswordFields, t *testi
}
}
}
func TestFindPermissionsFromUser(t *testing.T) {
s, _, err := NewTestInmemStore(t)
if err != nil {
t.Fatal(err)
}
storage := tenant.NewStore(s)
svc := tenant.NewService(storage)
// createUser
u := &influxdb.User{
Name: "rockstar",
Status: influxdb.Active,
}
if err := svc.CreateUser(context.Background(), u); err != nil {
t.Fatal(err)
}
ctx := context.Background()
// createSomeURMS
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
UserID: u.ID,
UserType: influxdb.Member,
ResourceType: influxdb.OrgsResourceType,
ResourceID: 1,
})
if err != nil {
t.Fatal(err)
}
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
UserID: u.ID,
UserType: influxdb.Owner,
ResourceType: influxdb.BucketsResourceType,
ResourceID: 2,
})
if err != nil {
t.Fatal(err)
}
// pull the permisssions for this user
perms, err := svc.FindPermissionForUser(ctx, u.ID)
if err != nil {
t.Fatal(err)
}
orgID := influxdb.ID(1)
expected := influxdb.PermissionSet{
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.AuthorizationsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.BucketsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DashboardsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{ID: &orgID, Type: influxdb.OrgsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SourcesResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TasksResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TelegrafsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.UsersResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.VariablesResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ScraperResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SecretsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.LabelsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ViewsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DocumentsResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationRuleResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationEndpointResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ChecksResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DBRPResourceType}},
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
influxdb.Permission{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
}
if !cmp.Equal(perms, expected) {
t.Fatalf("inequal response for find params %+v", cmp.Diff(perms, expected))
}
}

View File

@ -61,6 +61,9 @@ type UserService interface {
// Removes a user by ID.
DeleteUser(ctx context.Context, id ID) error
// FindPermissionForUser
FindPermissionForUser(ctx context.Context, UserID ID) (PermissionSet, error)
}
// UserUpdate represents updates to a user.

View File

@ -143,28 +143,57 @@ type UserResourceMappingFilter struct {
}
func (m *UserResourceMapping) ownerPerms() ([]Permission, error) {
ps := []Permission{}
// TODO(desa): how to grant access to specific resources.
if m.ResourceType == OrgsResourceType {
ps = append(ps, OwnerPermissions(m.ResourceID)...)
return OwnerPermissions(m.ResourceID), nil
}
ps := []Permission{
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
// Telegraf
// DashBoard
// notification rule
// notification endpoint
// Permission{
// Action: ReadAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
// Permission{
// Action: WriteAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
}
return ps, nil
}
func (m *UserResourceMapping) memberPerms() ([]Permission, error) {
ps := []Permission{}
// TODO(desa): how to grant access to specific resources.
if m.ResourceType == OrgsResourceType {
ps = append(ps, MemberPermissions(m.ResourceID)...)
return MemberPermissions(m.ResourceID), nil
}
if m.ResourceType == BucketsResourceType {
ps = append(ps, MemberBucketPermission(m.ResourceID))
return []Permission{MemberBucketPermission(m.ResourceID)}, nil
}
ps := []Permission{
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
// Telegraf
// DashBoard
// notification rule
// notification endpoint
// Permission{
// Action: ReadAction,
// Resource: Resource{
// Type: m.ResourceType,
// ID: &m.ResourceID,
// },
// },
}
return ps, nil
}