From e02e950ed017dc9f099eaaacf59200f692a6958f Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Mon, 6 Jul 2020 10:49:51 -0700 Subject: [PATCH] refactor: add new tenant system struct to consolidate services (#18833) --- cmd/influxd/launcher/launcher.go | 72 +++++++++++--------------------- tenant/service.go | 45 +++++++++++++++++++- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 10ad52d64b..3405711585 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -658,15 +658,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { ) tenantStore := tenant.NewStore(m.kvStore) - ts := tenant.NewService(tenantStore) - - var ( - userSvc platform.UserService = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, metric.WithSuffix("new"))) - orgSvc platform.OrganizationService = tenant.NewOrgLogger(m.log.With(zap.String("store", "new")), tenant.NewOrgMetrics(m.reg, ts, metric.WithSuffix("new"))) - userResourceSvc platform.UserResourceMappingService = tenant.NewURMLogger(m.log.With(zap.String("store", "new")), tenant.NewUrmMetrics(m.reg, ts, metric.WithSuffix("new"))) - bucketSvc platform.BucketService = tenant.NewBucketLogger(m.log.With(zap.String("store", "new")), tenant.NewBucketMetrics(m.reg, ts, metric.WithSuffix("new"))) - passwdsSvc platform.PasswordsService = tenant.NewPasswordLogger(m.log.With(zap.String("store", "new")), tenant.NewPasswordMetrics(m.reg, ts, metric.WithSuffix("new"))) - ) + ts := tenant.NewSystem(tenantStore, m.log.With(zap.String("store", "new")), m.reg, metric.WithSuffix("new")) secretStore, err := secret.NewStore(m.kvStore) if err != nil { @@ -702,11 +694,11 @@ func (m *Launcher) run(ctx context.Context) (err error) { if m.testing { // the testing engine will write/read into a temporary directory - engine := NewTemporaryEngine(m.StorageConfig, storage.WithRetentionEnforcer(bucketSvc)) + engine := NewTemporaryEngine(m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc)) flushers = append(flushers, engine) m.engine = engine } else { - m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(bucketSvc)) + m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc)) } m.engine.WithLogger(m.log) if err := m.engine.Open(ctx); err != nil { @@ -725,8 +717,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { deps, err := influxdb.NewDependencies( storageflux.NewReader(readservice.NewStore(m.engine)), m.engine, - authorizer.NewBucketService(bucketSvc, userResourceSvc), - authorizer.NewOrgService(orgSvc), + authorizer.NewBucketService(ts.BucketSvc, ts.UrmSvc), + authorizer.NewOrgService(ts.OrgSvc), authorizer.NewSecretService(secretSvc), nil, ) @@ -815,7 +807,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { } } - dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(bucketSvc, userResourceSvc), m.kvStore) + dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(ts.BucketSvc, ts.UrmSvc), m.kvStore) dbrpSvc = dbrp.NewAuthorizedService(dbrpSvc) var checkSvc platform.CheckService @@ -927,8 +919,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { { sessionSvc = session.NewService( session.NewStorage(inmem.NewSessionStore()), - userSvc, - userResourceSvc, + ts.UserSvc, + ts.UrmSvc, authSvc, session.WithSessionLength(time.Duration(m.sessionLength)*time.Minute), ) @@ -947,8 +939,8 @@ func (m *Launcher) run(ctx context.Context) (err error) { labelSvc = label.NewLabelController(m.flagger, m.kvService, ls) } - bucketSvc = storage.NewBucketService(bucketSvc, m.engine) - bucketSvc = dbrp.NewBucketService(m.log, bucketSvc, dbrpSvc) + ts.BucketSvc = storage.NewBucketService(ts.BucketSvc, m.engine) + ts.BucketSvc = dbrp.NewBucketService(m.log, ts.BucketSvc, dbrpSvc) m.apibackend = &http.APIBackend{ AssetsPath: m.assetsPath, @@ -959,7 +951,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { NewQueryService: source.NewQueryService, PointsWriter: &storage.LoggingPointsWriter{ Underlying: pointsWriter, - BucketFinder: bucketSvc, + BucketFinder: ts.BucketSvc, LogBucketName: platform.MonitoringSystemBucketName, }, DeleteService: deleteService, @@ -968,12 +960,12 @@ func (m *Launcher) run(ctx context.Context) (err error) { AuthorizationService: authSvc, AlgoWProxy: &http.NoopProxyHandler{}, // Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine. - BucketService: bucketSvc, + BucketService: ts.BucketSvc, SessionService: sessionSvc, - UserService: userSvc, + UserService: ts.UserSvc, DBRPService: dbrpSvc, - OrganizationService: orgSvc, - UserResourceMappingService: userResourceSvc, + OrganizationService: ts.OrgSvc, + UserResourceMappingService: ts.UrmSvc, LabelService: labelSvc, DashboardService: dashboardSvc, DashboardOperationLogService: dashboardLogSvc, @@ -982,14 +974,14 @@ func (m *Launcher) run(ctx context.Context) (err error) { OrganizationOperationLogService: orgLogSvc, SourceService: sourceSvc, VariableService: variableSvc, - PasswordsService: passwdsSvc, + PasswordsService: ts.PasswordSvc, InfluxQLService: storageQueryService, FluxService: storageQueryService, FluxLanguageService: fluxlang.DefaultService, TaskService: taskSvc, TelegrafService: telegrafSvc, NotificationRuleStore: notificationRuleSvc, - NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, userResourceSvc, orgSvc), + NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UrmSvc, ts.OrgSvc), CheckService: checkSvc, ScraperTargetStoreService: scraperTargetSvc, ChronografService: chronografSvc, @@ -1011,17 +1003,17 @@ func (m *Launcher) run(ctx context.Context) (err error) { { b := m.apibackend authedOrgSVC := authorizer.NewOrgService(b.OrganizationService) - authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService) + authedUrmSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService) pkgerLogger := m.log.With(zap.String("service", "pkger")) pkgSVC = pkger.NewService( pkger.WithLogger(pkgerLogger), pkger.WithStore(pkger.NewStoreKV(m.kvStore)), pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService, b.UserResourceMappingService)), - pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)), + pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedUrmSVC, authedOrgSVC)), pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)), pkger.WithLabelSVC(authorizer.NewLabelServiceWithOrg(b.LabelService, b.OrgLookupService)), - pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)), - pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)), + pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedUrmSVC, authedOrgSVC)), + pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedUrmSVC, authedOrgSVC)), pkger.WithOrganizationService(authorizer.NewOrgService(b.OrganizationService)), pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)), pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)), @@ -1052,10 +1044,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { templatesHTTPServer = pkger.NewHTTPServerTemplates(tLogger, pkgSVC) } - var userHTTPServer *tenant.UserHandler - { - userHTTPServer = tenant.NewHTTPUserHandler(m.log.With(zap.String("handler", "user")), tenant.NewAuthedUserService(userSvc), tenant.NewAuthedPasswordService(passwdsSvc)) - } + userHTTPServer := ts.NewUserHTTPHandler(m.log) var onboardHTTPServer *tenant.OnboardHandler { @@ -1107,23 +1096,12 @@ func (m *Launcher) run(ctx context.Context) (err error) { var sessionHTTPServer *session.SessionHandler { - sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, userSvc, passwdsSvc) + sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, ts.UserSvc, ts.PasswordSvc) } - var orgHTTPServer *tenant.OrgHandler - { - secretHandler := secret.NewHandler(m.log, "id", secret.NewAuthedService(secretSvc)) - urmHandler := tenant.NewURMHandler(m.log.With(zap.String("handler", "urm")), platform.OrgsResourceType, "id", userSvc, tenant.NewAuthedURMService(orgSvc, userResourceSvc)) - labelHandler := label.NewHTTPEmbeddedHandler(m.log.With(zap.String("handler", "label")), platform.OrgsResourceType, labelSvc) - orgHTTPServer = tenant.NewHTTPOrgHandler(m.log.With(zap.String("handler", "org")), tenant.NewAuthedOrgService(orgSvc), urmHandler, labelHandler, secretHandler) - } + orgHTTPServer := ts.NewOrgHTTPHandler(m.log, labelSvc, secret.NewAuthedService(secretSvc)) - var bucketHTTPServer *tenant.BucketHandler - { - urmHandler := tenant.NewURMHandler(m.log.With(zap.String("handler", "urm")), platform.OrgsResourceType, "id", userSvc, tenant.NewAuthedURMService(orgSvc, userResourceSvc)) - labelHandler := label.NewHTTPEmbeddedHandler(m.log.With(zap.String("handler", "label")), platform.BucketsResourceType, labelSvc) - bucketHTTPServer = tenant.NewHTTPBucketHandler(m.log.With(zap.String("handler", "bucket")), tenant.NewAuthedBucketService(bucketSvc), labelSvc, urmHandler, labelHandler) - } + bucketHTTPServer := ts.NewBucketHTTPHandler(m.log, labelSvc) { platformHandler := http.NewPlatformHandler(m.apibackend, diff --git a/tenant/service.go b/tenant/service.go index 89fe0c6b0d..15e19585eb 100644 --- a/tenant/service.go +++ b/tenant/service.go @@ -1,6 +1,13 @@ package tenant -import "github.com/influxdata/influxdb/v2" +import ( + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/metric" + "github.com/influxdata/influxdb/v2/label" + "github.com/influxdata/influxdb/v2/secret" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) type Service struct { store *Store @@ -11,3 +18,39 @@ func NewService(st *Store) influxdb.TenantService { store: st, } } + +type TenantSystem struct { + UserSvc influxdb.UserService + PasswordSvc influxdb.PasswordsService + UrmSvc influxdb.UserResourceMappingService + OrgSvc influxdb.OrganizationService + BucketSvc influxdb.BucketService +} + +func NewSystem(store *Store, log *zap.Logger, reg prometheus.Registerer, metricOpts ...metric.ClientOptFn) *TenantSystem { + ts := NewService(store) + return &TenantSystem{ + UserSvc: NewUserLogger(log, NewUserMetrics(reg, ts, metricOpts...)), + PasswordSvc: NewPasswordLogger(log, NewPasswordMetrics(reg, ts, metricOpts...)), + UrmSvc: NewURMLogger(log, NewUrmMetrics(reg, ts, metricOpts...)), + OrgSvc: NewOrgLogger(log, NewOrgMetrics(reg, ts, metricOpts...)), + BucketSvc: NewBucketLogger(log, NewBucketMetrics(reg, ts, metricOpts...)), + } +} + +func (ts *TenantSystem) NewOrgHTTPHandler(log *zap.Logger, labelSvc influxdb.LabelService, secretSvc influxdb.SecretService) *OrgHandler { + secretHandler := secret.NewHandler(log, "id", secret.NewAuthedService(secretSvc)) + urmHandler := NewURMHandler(log.With(zap.String("handler", "urm")), influxdb.OrgsResourceType, "id", ts.UserSvc, NewAuthedURMService(ts.OrgSvc, ts.UrmSvc)) + labelHandler := label.NewHTTPEmbeddedHandler(log.With(zap.String("handler", "label")), influxdb.OrgsResourceType, labelSvc) + return NewHTTPOrgHandler(log.With(zap.String("handler", "org")), NewAuthedOrgService(ts.OrgSvc), urmHandler, labelHandler, secretHandler) +} + +func (ts *TenantSystem) NewBucketHTTPHandler(log *zap.Logger, labelSvc influxdb.LabelService) *BucketHandler { + urmHandler := NewURMHandler(log.With(zap.String("handler", "urm")), influxdb.OrgsResourceType, "id", ts.UserSvc, NewAuthedURMService(ts.OrgSvc, ts.UrmSvc)) + labelHandler := label.NewHTTPEmbeddedHandler(log.With(zap.String("handler", "label")), influxdb.BucketsResourceType, labelSvc) + return NewHTTPBucketHandler(log.With(zap.String("handler", "bucket")), NewAuthedBucketService(ts.BucketSvc), labelSvc, urmHandler, labelHandler) +} + +func (ts *TenantSystem) NewUserHTTPHandler(log *zap.Logger) *UserHandler { + return NewHTTPUserHandler(log.With(zap.String("handler", "user")), NewAuthedUserService(ts.UserSvc), NewAuthedPasswordService(ts.PasswordSvc)) +}