feat(task): Add new permission lookup pattern for executor (#18920)
* feat(task): Add new permission lookup pattern for executor We can now use the user service to populate task owners permissions. This should improve the task lookup time and decouple the task system from the URM system. In the future we will have the ability to better isolate tenant pieces from the rest of the service. * feat: add feature flaggingpull/18941/head
parent
14db46c1a0
commit
a31b651057
|
|
@ -75,3 +75,10 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
|||
}
|
||||
return s.s.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: "not implemented",
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -752,9 +752,10 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
executor, executorMetrics := executor.NewExecutor(
|
||||
m.log.With(zap.String("service", "task-executor")),
|
||||
query.QueryServiceBridge{AsyncQueryService: m.queryController},
|
||||
authSvc,
|
||||
ts.UserSvc,
|
||||
combinedTaskService,
|
||||
combinedTaskService,
|
||||
executor.WithFlagger(m.flagger),
|
||||
)
|
||||
m.executor = executor
|
||||
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
|
||||
|
|
|
|||
|
|
@ -128,3 +128,10 @@
|
|||
default: false
|
||||
contact: Brett Buddin
|
||||
lifetime: temporary
|
||||
|
||||
- name: Use User Permission
|
||||
description: When enabled we will use the new user service permission function
|
||||
key: useUserPermission
|
||||
default: false
|
||||
contact: Lyon Hill
|
||||
lifetime: temporary
|
||||
|
|
|
|||
|
|
@ -596,6 +596,13 @@ func (s *UserService) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
|||
Do(ctx)
|
||||
}
|
||||
|
||||
func (s *UserService) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: "not implemented",
|
||||
}
|
||||
}
|
||||
|
||||
// PasswordService is an http client to speak to the password service.
|
||||
type PasswordService struct {
|
||||
Client *httpc.Client
|
||||
|
|
|
|||
|
|
@ -240,6 +240,20 @@ func SimpleTaskOptionsExtraction() BoolFlag {
|
|||
return simpleTaskOptionsExtraction
|
||||
}
|
||||
|
||||
var useUserPermission = MakeBoolFlag(
|
||||
"Use User Permission",
|
||||
"useUserPermission",
|
||||
"Lyon Hill",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// UseUserPermission - When enabled we will use the new user service permission function
|
||||
func UseUserPermission() BoolFlag {
|
||||
return useUserPermission
|
||||
}
|
||||
|
||||
var all = []Flag{
|
||||
appMetrics,
|
||||
backendExample,
|
||||
|
|
@ -258,6 +272,7 @@ var all = []Flag{
|
|||
memoryOptimizedSchemaMutation,
|
||||
urmFreeTasks,
|
||||
simpleTaskOptionsExtraction,
|
||||
useUserPermission,
|
||||
}
|
||||
|
||||
var byKey = map[string]Flag{
|
||||
|
|
@ -278,4 +293,5 @@ var byKey = map[string]Flag{
|
|||
"memoryOptimizedSchemaMutation": memoryOptimizedSchemaMutation,
|
||||
"urmFreeTasks": urmFreeTasks,
|
||||
"simpleTaskOptionsExtraction": simpleTaskOptionsExtraction,
|
||||
"useUserPermission": useUserPermission,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,105 +0,0 @@
|
|||
package kv_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/tenant"
|
||||
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
|
||||
s, closeInMem, err := NewTestInmemStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initBucketService(s, f, t)
|
||||
ro := tenant.NewReadOnlyStore(s)
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeInMem()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadBucketService(t *testing.T) {
|
||||
influxdbtesting.BucketService(initBoltDuplicateReadBucketService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initOrganizationService(s, f, t)
|
||||
ro := tenant.NewReadOnlyStore(s)
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadOrganizationService(t *testing.T) {
|
||||
influxdbtesting.OrganizationService(initBoltDuplicateReadOrganizationService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, closeSvc := initUserResourceMappingService(s, f, t)
|
||||
ro := tenant.NewReadOnlyStore(s)
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadUserResourceMappingService(t *testing.T) {
|
||||
influxdbtesting.UserResourceMappingService(initBoltDuplicateReadUserResourceMappingService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initUserService(s, f, t)
|
||||
ro := tenant.NewReadOnlyStore(s)
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadUserService(t *testing.T) {
|
||||
influxdbtesting.UserService(initBoltDuplicateReadUserService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
|
||||
s, closeStore, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, closeSvc := initPasswordsService(s, f, t)
|
||||
ro := tenant.NewReadOnlyStore(s)
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeStore()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadPasswordService(t *testing.T) {
|
||||
influxdbtesting.PasswordsService(initBoltDuplicateReadPasswordsService, t)
|
||||
}
|
||||
|
|
@ -442,6 +442,13 @@ func (s *Service) deleteUser(ctx context.Context, tx Tx, id influxdb.ID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInternal,
|
||||
Msg: "not implemented",
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) deleteUsersAuthorizations(ctx context.Context, tx Tx, id influxdb.ID) error {
|
||||
authFilter := influxdb.AuthorizationFilter{
|
||||
UserID: &id,
|
||||
|
|
|
|||
|
|
@ -12,12 +12,13 @@ var _ platform.UserService = (*UserService)(nil)
|
|||
// also makes it a suitable mock to use wherever an platform.UserService is required.
|
||||
type UserService struct {
|
||||
// Methods for a platform.UserService
|
||||
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
|
||||
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
|
||||
CreateUserFn func(context.Context, *platform.User) error
|
||||
DeleteUserFn func(context.Context, platform.ID) error
|
||||
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
|
||||
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
|
||||
FindUserByIDFn func(context.Context, platform.ID) (*platform.User, error)
|
||||
FindUsersFn func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error)
|
||||
CreateUserFn func(context.Context, *platform.User) error
|
||||
DeleteUserFn func(context.Context, platform.ID) error
|
||||
FindUserFn func(context.Context, platform.UserFilter) (*platform.User, error)
|
||||
UpdateUserFn func(context.Context, platform.ID, platform.UserUpdate) (*platform.User, error)
|
||||
FindPermissionForUserFn func(context.Context, platform.ID) (platform.PermissionSet, error)
|
||||
}
|
||||
|
||||
// NewUserService returns a mock of UserService where its methods will return zero values.
|
||||
|
|
@ -31,6 +32,7 @@ func NewUserService() *UserService {
|
|||
FindUsersFn: func(context.Context, platform.UserFilter, ...platform.FindOptions) ([]*platform.User, int, error) {
|
||||
return nil, 0, nil
|
||||
},
|
||||
FindPermissionForUserFn: func(context.Context, platform.ID) (platform.PermissionSet, error) { return nil, nil },
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,3 +65,7 @@ func (s *UserService) FindUser(ctx context.Context, filter platform.UserFilter)
|
|||
func (s *UserService) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) {
|
||||
return s.UpdateUserFn(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s *UserService) FindPermissionForUser(ctx context.Context, uid platform.ID) (platform.PermissionSet, error) {
|
||||
return s.FindPermissionForUserFn(ctx, uid)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/influxdata/flux/runtime"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
icontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/kit/feature"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/task/backend"
|
||||
|
|
@ -25,6 +26,10 @@ const (
|
|||
|
||||
var _ scheduler.Executor = (*Executor)(nil)
|
||||
|
||||
type PermissionService interface {
|
||||
FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error)
|
||||
}
|
||||
|
||||
type Promise interface {
|
||||
ID() influxdb.ID
|
||||
Cancel(ctx context.Context)
|
||||
|
|
@ -51,6 +56,7 @@ type executorConfig struct {
|
|||
maxWorkers int
|
||||
systemBuildCompiler CompilerBuilderFunc
|
||||
nonSystemBuildCompiler CompilerBuilderFunc
|
||||
flagger feature.Flagger
|
||||
}
|
||||
|
||||
type executorOption func(*executorConfig)
|
||||
|
|
@ -83,8 +89,15 @@ func WithNonSystemCompilerBuilder(builder CompilerBuilderFunc) executorOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithFlagger is an Executor option that allows us to use a feature flagger in the executor
|
||||
func WithFlagger(flagger feature.Flagger) executorOption {
|
||||
return func(o *executorConfig) {
|
||||
o.flagger = flagger
|
||||
}
|
||||
}
|
||||
|
||||
// NewExecutor creates a new task executor
|
||||
func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
|
||||
func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics) {
|
||||
cfg := &executorConfig{
|
||||
maxWorkers: defaultMaxWorkers,
|
||||
systemBuildCompiler: NewASTCompiler,
|
||||
|
|
@ -99,7 +112,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.Authorizati
|
|||
ts: ts,
|
||||
tcs: tcs,
|
||||
qs: qs,
|
||||
as: as,
|
||||
ps: us,
|
||||
|
||||
currentPromises: sync.Map{},
|
||||
promiseQueue: make(chan *promise, maxPromises),
|
||||
|
|
@ -107,6 +120,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.Authorizati
|
|||
limitFunc: func(*influxdb.Task, *influxdb.Run) error { return nil }, // noop
|
||||
systemBuildCompiler: cfg.systemBuildCompiler,
|
||||
nonSystemBuildCompiler: cfg.nonSystemBuildCompiler,
|
||||
flagger: cfg.flagger,
|
||||
}
|
||||
|
||||
e.metrics = NewExecutorMetrics(e)
|
||||
|
|
@ -126,7 +140,7 @@ type Executor struct {
|
|||
tcs backend.TaskControlService
|
||||
|
||||
qs query.QueryService
|
||||
as influxdb.AuthorizationService
|
||||
ps PermissionService
|
||||
|
||||
metrics *ExecutorMetrics
|
||||
|
||||
|
|
@ -144,6 +158,7 @@ type Executor struct {
|
|||
|
||||
nonSystemBuildCompiler CompilerBuilderFunc
|
||||
systemBuildCompiler CompilerBuilderFunc
|
||||
flagger feature.Flagger
|
||||
}
|
||||
|
||||
// SetLimitFunc sets the limit func for this task executor
|
||||
|
|
@ -276,16 +291,31 @@ func (e *Executor) createPromise(ctx context.Context, run *influxdb.Run) (*promi
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !t.Authorization.GetUserID().Valid() {
|
||||
t.Authorization.UserID = t.OwnerID
|
||||
|
||||
var perm influxdb.PermissionSet
|
||||
if e.flagger != nil && feature.UseUserPermission().Enabled(ctx, e.flagger) {
|
||||
perm, err = e.ps.FindPermissionForUser(ctx, t.OwnerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if perm == nil {
|
||||
perm = t.Authorization.Permissions
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
// create promise
|
||||
p := &promise{
|
||||
run: run,
|
||||
task: t,
|
||||
auth: t.Authorization,
|
||||
run: run,
|
||||
task: t,
|
||||
auth: &influxdb.Authorization{
|
||||
Status: influxdb.Active,
|
||||
UserID: t.OwnerID,
|
||||
ID: influxdb.ID(1),
|
||||
OrgID: t.OrganizationID,
|
||||
Permissions: perm,
|
||||
},
|
||||
createdAt: time.Now().UTC(),
|
||||
done: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
|
|
@ -442,7 +472,7 @@ func (w *worker) executeQuery(p *promise) {
|
|||
// start
|
||||
w.start(p)
|
||||
|
||||
ctx = icontext.SetAuthorizer(ctx, p.task.Authorization)
|
||||
ctx = icontext.SetAuthorizer(ctx, p.auth)
|
||||
|
||||
buildCompiler := w.systemBuildCompiler
|
||||
if p.task.Type != influxdb.TaskSystemType {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
icontext "github.com/influxdata/influxdb/v2/context"
|
||||
|
|
@ -22,6 +23,7 @@ import (
|
|||
"github.com/influxdata/influxdb/v2/query"
|
||||
"github.com/influxdata/influxdb/v2/query/fluxlang"
|
||||
"github.com/influxdata/influxdb/v2/task/backend"
|
||||
"github.com/influxdata/influxdb/v2/task/backend/executor/mock"
|
||||
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/uber/jaeger-client-go"
|
||||
|
|
@ -62,13 +64,16 @@ func taskExecutorSystem(t *testing.T) tes {
|
|||
if err := all.Up(ctx, logger, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
ps := mock.NewMockPermissionService(ctrl)
|
||||
ps.EXPECT().FindPermissionForUser(gomock.Any(), gomock.Any()).Return(influxdb.PermissionSet{}, nil).AnyTimes()
|
||||
var (
|
||||
svc = kv.NewService(logger, store, kv.ServiceConfig{
|
||||
FluxLanguageService: fluxlang.DefaultService,
|
||||
})
|
||||
|
||||
tcs = &taskControlService{TaskControlService: svc}
|
||||
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, svc, svc, tcs)
|
||||
ex, metrics = NewExecutor(zaptest.NewLogger(t), qs, ps, svc, tcs)
|
||||
)
|
||||
return tes{
|
||||
svc: aqs,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,127 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: executor.go
|
||||
|
||||
// Package mock is a generated GoMock package.
|
||||
package mock
|
||||
|
||||
import (
|
||||
context "context"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
influxdb "github.com/influxdata/influxdb/v2"
|
||||
reflect "reflect"
|
||||
)
|
||||
|
||||
// MockPermissionService is a mock of PermissionService interface
|
||||
type MockPermissionService struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPermissionServiceMockRecorder
|
||||
}
|
||||
|
||||
// MockPermissionServiceMockRecorder is the mock recorder for MockPermissionService
|
||||
type MockPermissionServiceMockRecorder struct {
|
||||
mock *MockPermissionService
|
||||
}
|
||||
|
||||
// NewMockPermissionService creates a new mock instance
|
||||
func NewMockPermissionService(ctrl *gomock.Controller) *MockPermissionService {
|
||||
mock := &MockPermissionService{ctrl: ctrl}
|
||||
mock.recorder = &MockPermissionServiceMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockPermissionService) EXPECT() *MockPermissionServiceMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// FindPermissionForUser mocks base method
|
||||
func (m *MockPermissionService) FindPermissionForUser(ctx context.Context, UserID influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FindPermissionForUser", ctx, UserID)
|
||||
ret0, _ := ret[0].(influxdb.PermissionSet)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FindPermissionForUser indicates an expected call of FindPermissionForUser
|
||||
func (mr *MockPermissionServiceMockRecorder) FindPermissionForUser(ctx, UserID interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindPermissionForUser", reflect.TypeOf((*MockPermissionService)(nil).FindPermissionForUser), ctx, UserID)
|
||||
}
|
||||
|
||||
// MockPromise is a mock of Promise interface
|
||||
type MockPromise struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockPromiseMockRecorder
|
||||
}
|
||||
|
||||
// MockPromiseMockRecorder is the mock recorder for MockPromise
|
||||
type MockPromiseMockRecorder struct {
|
||||
mock *MockPromise
|
||||
}
|
||||
|
||||
// NewMockPromise creates a new mock instance
|
||||
func NewMockPromise(ctrl *gomock.Controller) *MockPromise {
|
||||
mock := &MockPromise{ctrl: ctrl}
|
||||
mock.recorder = &MockPromiseMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockPromise) EXPECT() *MockPromiseMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// ID mocks base method
|
||||
func (m *MockPromise) ID() influxdb.ID {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ID")
|
||||
ret0, _ := ret[0].(influxdb.ID)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ID indicates an expected call of ID
|
||||
func (mr *MockPromiseMockRecorder) ID() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockPromise)(nil).ID))
|
||||
}
|
||||
|
||||
// Cancel mocks base method
|
||||
func (m *MockPromise) Cancel(ctx context.Context) {
|
||||
m.ctrl.T.Helper()
|
||||
m.ctrl.Call(m, "Cancel", ctx)
|
||||
}
|
||||
|
||||
// Cancel indicates an expected call of Cancel
|
||||
func (mr *MockPromiseMockRecorder) Cancel(ctx interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*MockPromise)(nil).Cancel), ctx)
|
||||
}
|
||||
|
||||
// Done mocks base method
|
||||
func (m *MockPromise) Done() <-chan struct{} {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Done")
|
||||
ret0, _ := ret[0].(<-chan struct{})
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Done indicates an expected call of Done
|
||||
func (mr *MockPromiseMockRecorder) Done() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockPromise)(nil).Done))
|
||||
}
|
||||
|
||||
// Error mocks base method
|
||||
func (m *MockPromise) Error() error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Error")
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Error indicates an expected call of Error
|
||||
func (mr *MockPromiseMockRecorder) Error() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockPromise)(nil).Error))
|
||||
}
|
||||
|
|
@ -1,344 +0,0 @@
|
|||
package tenant
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ influxdb.TenantService = (*tenantService)(nil)
|
||||
|
||||
var bucketCmpOptions = cmp.Options{
|
||||
cmp.Transformer("Sort", func(in []*influxdb.Bucket) []*influxdb.Bucket {
|
||||
out := append([]*influxdb.Bucket(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].Name > out[j].Name
|
||||
})
|
||||
return out
|
||||
}),
|
||||
}
|
||||
|
||||
// readOnlyStore is a wrapper for kv.Store that ensures that updates are not applied.
|
||||
type readOnlyStore struct {
|
||||
kv.Store
|
||||
}
|
||||
|
||||
func (r readOnlyStore) Update(context.Context, func(kv.Tx) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type tenantService struct {
|
||||
log *zap.Logger
|
||||
|
||||
oldBucketSvc influxdb.BucketService
|
||||
newBucketSvc influxdb.BucketService
|
||||
oldOrgSvc influxdb.OrganizationService
|
||||
newOrgSvc influxdb.OrganizationService
|
||||
oldURMSvc influxdb.UserResourceMappingService
|
||||
newURMSvc influxdb.UserResourceMappingService
|
||||
oldUserSvc influxdb.UserService
|
||||
newUserSvc influxdb.UserService
|
||||
oldPwdSvc influxdb.PasswordsService
|
||||
newPwdSvc influxdb.PasswordsService
|
||||
}
|
||||
|
||||
// NewReadOnlyStore returns a Store that cannot update the underlying kv.Store.
|
||||
func NewReadOnlyStore(store kv.Store) *Store {
|
||||
return NewStore(readOnlyStore{Store: store})
|
||||
}
|
||||
|
||||
// NewDuplicateReadTenantService returns a tenant service that duplicates the reads to oldSvc and newSvc.
|
||||
// The foreseen use case is to compare two service versions, an old one and a new one.
|
||||
// The resulting influxdb.TenantService:
|
||||
// - forwards writes to the old service;
|
||||
// - reads from the old one, if no error is encountered, it reads from the new one;
|
||||
// - compares the results obtained and logs the difference, if any.
|
||||
func NewDuplicateReadTenantService(log *zap.Logger, oldSvc, newSvc influxdb.TenantService) influxdb.TenantService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldBucketSvc: oldSvc,
|
||||
oldOrgSvc: oldSvc,
|
||||
oldURMSvc: oldSvc,
|
||||
oldUserSvc: oldSvc,
|
||||
oldPwdSvc: oldSvc,
|
||||
|
||||
newBucketSvc: newSvc,
|
||||
newOrgSvc: newSvc,
|
||||
newURMSvc: newSvc,
|
||||
newUserSvc: newSvc,
|
||||
newPwdSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadBucketService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadBucketService(log *zap.Logger, oldSvc, newSvc influxdb.BucketService) influxdb.BucketService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldBucketSvc: oldSvc,
|
||||
newBucketSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadOrganizationService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadOrganizationService(log *zap.Logger, oldSvc, newSvc influxdb.OrganizationService) influxdb.OrganizationService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldOrgSvc: oldSvc,
|
||||
newOrgSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadUserResourceMappingService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadUserResourceMappingService(log *zap.Logger, oldSvc, newSvc influxdb.UserResourceMappingService) influxdb.UserResourceMappingService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldURMSvc: oldSvc,
|
||||
newURMSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadUserService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadUserService(log *zap.Logger, oldSvc, newSvc influxdb.UserService) influxdb.UserService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldUserSvc: oldSvc,
|
||||
newUserSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadPasswordsService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadPasswordsService(log *zap.Logger, oldSvc, newSvc influxdb.PasswordsService) influxdb.PasswordsService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldPwdSvc: oldSvc,
|
||||
newPwdSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucketByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucketByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucketByName(ctx, orgID, name)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucketByName(ctx, orgID, name)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByName"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucket(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucket(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucket"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||
o, no, err := s.oldBucketSvc.FindBuckets(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newBucketSvc.FindBuckets(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n, bucketCmpOptions); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBuckets"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||
return s.oldBucketSvc.CreateBucket(ctx, b)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
||||
return s.oldBucketSvc.UpdateBucket(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteBucket(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldBucketSvc.DeleteBucket(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
|
||||
o, err := s.oldOrgSvc.FindOrganizationByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newOrgSvc.FindOrganizationByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizationByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
|
||||
o, err := s.oldOrgSvc.FindOrganization(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newOrgSvc.FindOrganization(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganization"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
|
||||
o, no, err := s.oldOrgSvc.FindOrganizations(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newOrgSvc.FindOrganizations(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizations"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateOrganization(ctx context.Context, b *influxdb.Organization) error {
|
||||
return s.oldOrgSvc.CreateOrganization(ctx, b)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
|
||||
return s.oldOrgSvc.UpdateOrganization(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldOrgSvc.DeleteOrganization(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
o, no, err := s.oldURMSvc.FindUserResourceMappings(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newURMSvc.FindUserResourceMappings(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserResourceMappings"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
|
||||
return s.oldURMSvc.CreateUserResourceMapping(ctx, m)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteUserResourceMapping(ctx context.Context, resourceID, userID influxdb.ID) error {
|
||||
return s.oldURMSvc.DeleteUserResourceMapping(ctx, resourceID, userID)
|
||||
}
|
||||
|
||||
func (s tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
|
||||
o, err := s.oldUserSvc.FindUserByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newUserSvc.FindUserByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) {
|
||||
o, err := s.oldUserSvc.FindUser(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newUserSvc.FindUser(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUser"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) {
|
||||
o, no, err := s.oldUserSvc.FindUsers(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newUserSvc.FindUsers(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUsers"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateUser(ctx context.Context, u *influxdb.User) error {
|
||||
return s.oldUserSvc.CreateUser(ctx, u)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) {
|
||||
return s.oldUserSvc.UpdateUser(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldUserSvc.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
return s.oldPwdSvc.SetPassword(ctx, userID, password)
|
||||
}
|
||||
|
||||
func (s tenantService) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
if err := s.oldPwdSvc.ComparePassword(ctx, userID, password); err != nil {
|
||||
return err
|
||||
}
|
||||
err := s.newPwdSvc.ComparePassword(ctx, userID, password)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s tenantService) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error {
|
||||
return s.oldPwdSvc.CompareAndSetPassword(ctx, userID, old, new)
|
||||
}
|
||||
|
|
@ -126,6 +126,19 @@ func (s *UserClientService) DeleteUser(ctx context.Context, id influxdb.ID) erro
|
|||
Do(ctx)
|
||||
}
|
||||
|
||||
// FindUserByID returns a single user by ID.
|
||||
func (s *UserClientService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
var ps influxdb.PermissionSet
|
||||
err := s.Client.
|
||||
Get(prefixUsers, id.String(), "permissions").
|
||||
DecodeJSON(&ps).
|
||||
Do(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
// PasswordClientService is an http client to speak to the password service.
|
||||
type PasswordClientService struct {
|
||||
Client *httpc.Client
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ func NewHTTPUserHandler(log *zap.Logger, userService influxdb.UserService, passw
|
|||
r.Get("/", svr.handleGetUser)
|
||||
r.Patch("/", svr.handlePatchUser)
|
||||
r.Delete("/", svr.handleDeleteUser)
|
||||
r.Get("/permissions", svr.handleGetPermissions)
|
||||
r.Put("/password", svr.handlePutUserPassword)
|
||||
r.Post("/password", svr.handlePostUserPassword)
|
||||
})
|
||||
|
|
@ -257,6 +258,31 @@ func (h *UserHandler) handleGetUser(w http.ResponseWriter, r *http.Request) {
|
|||
h.api.Respond(w, r, http.StatusOK, newUserResponse(b))
|
||||
}
|
||||
|
||||
func (h *UserHandler) handleGetPermissions(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
if id == "" {
|
||||
err := &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: "url missing id",
|
||||
}
|
||||
h.api.Err(w, r, err)
|
||||
return
|
||||
}
|
||||
var i influxdb.ID
|
||||
if err := i.DecodeFromString(id); err != nil {
|
||||
h.api.Err(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
ps, err := h.userSvc.FindPermissionForUser(r.Context(), i)
|
||||
if err != nil {
|
||||
h.api.Err(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
h.api.Respond(w, r, http.StatusOK, ps)
|
||||
}
|
||||
|
||||
type getUserRequest struct {
|
||||
UserID influxdb.ID
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,6 +79,13 @@ func (s *AuthedUserService) DeleteUser(ctx context.Context, id influxdb.ID) erro
|
|||
return s.s.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
func (s *AuthedUserService) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
if _, _, err := authorizer.AuthorizeReadResource(ctx, influxdb.UsersResourceType, id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.s.FindPermissionForUser(ctx, id)
|
||||
}
|
||||
|
||||
// AuthedPasswordService is a new authorization middleware for a password service.
|
||||
type AuthedPasswordService struct {
|
||||
s influxdb.PasswordsService
|
||||
|
|
|
|||
|
|
@ -99,6 +99,19 @@ func (l *UserLogger) DeleteUser(ctx context.Context, id influxdb.ID) (err error)
|
|||
return l.userService.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
func (l *UserLogger) FindPermissionForUser(ctx context.Context, id influxdb.ID) (ps influxdb.PermissionSet, err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to delete user with ID %v", id)
|
||||
l.logger.Debug(msg, zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Debug("find permission for user", dur)
|
||||
}(time.Now())
|
||||
return l.userService.FindPermissionForUser(ctx, id)
|
||||
}
|
||||
|
||||
type PasswordLogger struct {
|
||||
logger *zap.Logger
|
||||
pwdService influxdb.PasswordsService
|
||||
|
|
|
|||
|
|
@ -63,6 +63,12 @@ func (m *UserMetrics) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
|||
return rec(err)
|
||||
}
|
||||
|
||||
func (m *UserMetrics) FindPermissionForUser(ctx context.Context, id influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
rec := m.rec.Record("find_permission_for_user")
|
||||
ps, err := m.userService.FindPermissionForUser(ctx, id)
|
||||
return ps, rec(err)
|
||||
}
|
||||
|
||||
type PasswordMetrics struct {
|
||||
// RED metrics
|
||||
rec *metric.REDClient
|
||||
|
|
|
|||
|
|
@ -60,3 +60,19 @@ func (s *Service) removeResourceRelations(ctx context.Context, tx kv.Tx, resourc
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func permissionFromMapping(mappings []*influxdb.UserResourceMapping) ([]influxdb.Permission, error) {
|
||||
ps := make([]influxdb.Permission, 0, len(mappings))
|
||||
for _, m := range mappings {
|
||||
p, err := m.ToPermissions()
|
||||
if err != nil {
|
||||
return nil, &influxdb.Error{
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
ps = append(ps, p...)
|
||||
}
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -131,6 +131,38 @@ func (s *Service) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// FindPermissionForUser gets the full set of permission for a specified user id
|
||||
func (s *Service) FindPermissionForUser(ctx context.Context, uid influxdb.ID) (influxdb.PermissionSet, error) {
|
||||
mappings, _, err := s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Limit: 100})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
permissions, err := permissionFromMapping(mappings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(mappings) >= 100 {
|
||||
// if we got 100 mappings we probably need to pull more pages
|
||||
// account for paginated results
|
||||
for i := len(mappings); len(mappings) > 0; i += len(mappings) {
|
||||
mappings, _, err = s.FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{UserID: uid}, influxdb.FindOptions{Offset: i, Limit: 100})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pms, err := permissionFromMapping(mappings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
permissions = append(permissions, pms...)
|
||||
}
|
||||
}
|
||||
|
||||
permissions = append(permissions, influxdb.MePermissions(uid)...)
|
||||
return permissions, nil
|
||||
}
|
||||
|
||||
// SetPassword overrides the password of a known user.
|
||||
func (s *Service) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
if len(password) < 8 {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kv"
|
||||
"github.com/influxdata/influxdb/v2/tenant"
|
||||
|
|
@ -87,3 +88,77 @@ func initPasswordsService(s kv.Store, f influxdbtesting.PasswordFields, t *testi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindPermissionsFromUser(t *testing.T) {
|
||||
s, _, err := NewTestInmemStore(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
storage := tenant.NewStore(s)
|
||||
svc := tenant.NewService(storage)
|
||||
|
||||
// createUser
|
||||
u := &influxdb.User{
|
||||
Name: "rockstar",
|
||||
Status: influxdb.Active,
|
||||
}
|
||||
|
||||
if err := svc.CreateUser(context.Background(), u); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// createSomeURMS
|
||||
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
|
||||
UserID: u.ID,
|
||||
UserType: influxdb.Member,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
ResourceID: 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = svc.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
|
||||
UserID: u.ID,
|
||||
UserType: influxdb.Owner,
|
||||
ResourceType: influxdb.BucketsResourceType,
|
||||
ResourceID: 2,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// pull the permisssions for this user
|
||||
perms, err := svc.FindPermissionForUser(ctx, u.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
orgID := influxdb.ID(1)
|
||||
expected := influxdb.PermissionSet{
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.AuthorizationsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.BucketsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DashboardsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{ID: &orgID, Type: influxdb.OrgsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SourcesResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TasksResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.TelegrafsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.UsersResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.VariablesResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ScraperResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.SecretsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.LabelsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ViewsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DocumentsResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationRuleResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.NotificationEndpointResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.ChecksResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{OrgID: &orgID, Type: influxdb.DBRPResourceType}},
|
||||
influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
|
||||
influxdb.Permission{Action: influxdb.WriteAction, Resource: influxdb.Resource{Type: influxdb.UsersResourceType, ID: &u.ID}},
|
||||
}
|
||||
if !cmp.Equal(perms, expected) {
|
||||
t.Fatalf("inequal response for find params %+v", cmp.Diff(perms, expected))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
3
user.go
3
user.go
|
|
@ -61,6 +61,9 @@ type UserService interface {
|
|||
|
||||
// Removes a user by ID.
|
||||
DeleteUser(ctx context.Context, id ID) error
|
||||
|
||||
// FindPermissionForUser
|
||||
FindPermissionForUser(ctx context.Context, UserID ID) (PermissionSet, error)
|
||||
}
|
||||
|
||||
// UserUpdate represents updates to a user.
|
||||
|
|
|
|||
|
|
@ -143,28 +143,57 @@ type UserResourceMappingFilter struct {
|
|||
}
|
||||
|
||||
func (m *UserResourceMapping) ownerPerms() ([]Permission, error) {
|
||||
ps := []Permission{}
|
||||
// TODO(desa): how to grant access to specific resources.
|
||||
|
||||
if m.ResourceType == OrgsResourceType {
|
||||
ps = append(ps, OwnerPermissions(m.ResourceID)...)
|
||||
return OwnerPermissions(m.ResourceID), nil
|
||||
}
|
||||
|
||||
ps := []Permission{
|
||||
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
|
||||
// Telegraf
|
||||
// DashBoard
|
||||
// notification rule
|
||||
// notification endpoint
|
||||
// Permission{
|
||||
// Action: ReadAction,
|
||||
// Resource: Resource{
|
||||
// Type: m.ResourceType,
|
||||
// ID: &m.ResourceID,
|
||||
// },
|
||||
// },
|
||||
// Permission{
|
||||
// Action: WriteAction,
|
||||
// Resource: Resource{
|
||||
// Type: m.ResourceType,
|
||||
// ID: &m.ResourceID,
|
||||
// },
|
||||
// },
|
||||
}
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
func (m *UserResourceMapping) memberPerms() ([]Permission, error) {
|
||||
ps := []Permission{}
|
||||
// TODO(desa): how to grant access to specific resources.
|
||||
|
||||
if m.ResourceType == OrgsResourceType {
|
||||
ps = append(ps, MemberPermissions(m.ResourceID)...)
|
||||
return MemberPermissions(m.ResourceID), nil
|
||||
}
|
||||
|
||||
if m.ResourceType == BucketsResourceType {
|
||||
ps = append(ps, MemberBucketPermission(m.ResourceID))
|
||||
return []Permission{MemberBucketPermission(m.ResourceID)}, nil
|
||||
}
|
||||
|
||||
ps := []Permission{
|
||||
// TODO: Uncomment these once the URM system is no longer being used for find lookups for:
|
||||
// Telegraf
|
||||
// DashBoard
|
||||
// notification rule
|
||||
// notification endpoint
|
||||
// Permission{
|
||||
// Action: ReadAction,
|
||||
// Resource: Resource{
|
||||
// Type: m.ResourceType,
|
||||
// ID: &m.ResourceID,
|
||||
// },
|
||||
// },
|
||||
}
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue