refactor: add new tenant system struct to consolidate services (#18833)

pull/18867/head
Alirie Gray 2020-07-06 10:49:51 -07:00 committed by GitHub
parent e042fa8207
commit e02e950ed0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 48 deletions

View File

@ -658,15 +658,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
)
tenantStore := tenant.NewStore(m.kvStore)
ts := tenant.NewService(tenantStore)
var (
userSvc platform.UserService = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, metric.WithSuffix("new")))
orgSvc platform.OrganizationService = tenant.NewOrgLogger(m.log.With(zap.String("store", "new")), tenant.NewOrgMetrics(m.reg, ts, metric.WithSuffix("new")))
userResourceSvc platform.UserResourceMappingService = tenant.NewURMLogger(m.log.With(zap.String("store", "new")), tenant.NewUrmMetrics(m.reg, ts, metric.WithSuffix("new")))
bucketSvc platform.BucketService = tenant.NewBucketLogger(m.log.With(zap.String("store", "new")), tenant.NewBucketMetrics(m.reg, ts, metric.WithSuffix("new")))
passwdsSvc platform.PasswordsService = tenant.NewPasswordLogger(m.log.With(zap.String("store", "new")), tenant.NewPasswordMetrics(m.reg, ts, metric.WithSuffix("new")))
)
ts := tenant.NewSystem(tenantStore, m.log.With(zap.String("store", "new")), m.reg, metric.WithSuffix("new"))
secretStore, err := secret.NewStore(m.kvStore)
if err != nil {
@ -702,11 +694,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
if m.testing {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(m.StorageConfig, storage.WithRetentionEnforcer(bucketSvc))
engine := NewTemporaryEngine(m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc))
flushers = append(flushers, engine)
m.engine = engine
} else {
m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(bucketSvc))
m.engine = storage.NewEngine(m.enginePath, m.StorageConfig, storage.WithRetentionEnforcer(ts.BucketSvc))
}
m.engine.WithLogger(m.log)
if err := m.engine.Open(ctx); err != nil {
@ -725,8 +717,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
deps, err := influxdb.NewDependencies(
storageflux.NewReader(readservice.NewStore(m.engine)),
m.engine,
authorizer.NewBucketService(bucketSvc, userResourceSvc),
authorizer.NewOrgService(orgSvc),
authorizer.NewBucketService(ts.BucketSvc, ts.UrmSvc),
authorizer.NewOrgService(ts.OrgSvc),
authorizer.NewSecretService(secretSvc),
nil,
)
@ -815,7 +807,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
}
dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(bucketSvc, userResourceSvc), m.kvStore)
dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(ts.BucketSvc, ts.UrmSvc), m.kvStore)
dbrpSvc = dbrp.NewAuthorizedService(dbrpSvc)
var checkSvc platform.CheckService
@ -927,8 +919,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
{
sessionSvc = session.NewService(
session.NewStorage(inmem.NewSessionStore()),
userSvc,
userResourceSvc,
ts.UserSvc,
ts.UrmSvc,
authSvc,
session.WithSessionLength(time.Duration(m.sessionLength)*time.Minute),
)
@ -947,8 +939,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
labelSvc = label.NewLabelController(m.flagger, m.kvService, ls)
}
bucketSvc = storage.NewBucketService(bucketSvc, m.engine)
bucketSvc = dbrp.NewBucketService(m.log, bucketSvc, dbrpSvc)
ts.BucketSvc = storage.NewBucketService(ts.BucketSvc, m.engine)
ts.BucketSvc = dbrp.NewBucketService(m.log, ts.BucketSvc, dbrpSvc)
m.apibackend = &http.APIBackend{
AssetsPath: m.assetsPath,
@ -959,7 +951,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
NewQueryService: source.NewQueryService,
PointsWriter: &storage.LoggingPointsWriter{
Underlying: pointsWriter,
BucketFinder: bucketSvc,
BucketFinder: ts.BucketSvc,
LogBucketName: platform.MonitoringSystemBucketName,
},
DeleteService: deleteService,
@ -968,12 +960,12 @@ func (m *Launcher) run(ctx context.Context) (err error) {
AuthorizationService: authSvc,
AlgoWProxy: &http.NoopProxyHandler{},
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
BucketService: bucketSvc,
BucketService: ts.BucketSvc,
SessionService: sessionSvc,
UserService: userSvc,
UserService: ts.UserSvc,
DBRPService: dbrpSvc,
OrganizationService: orgSvc,
UserResourceMappingService: userResourceSvc,
OrganizationService: ts.OrgSvc,
UserResourceMappingService: ts.UrmSvc,
LabelService: labelSvc,
DashboardService: dashboardSvc,
DashboardOperationLogService: dashboardLogSvc,
@ -982,14 +974,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
OrganizationOperationLogService: orgLogSvc,
SourceService: sourceSvc,
VariableService: variableSvc,
PasswordsService: passwdsSvc,
PasswordsService: ts.PasswordSvc,
InfluxQLService: storageQueryService,
FluxService: storageQueryService,
FluxLanguageService: fluxlang.DefaultService,
TaskService: taskSvc,
TelegrafService: telegrafSvc,
NotificationRuleStore: notificationRuleSvc,
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, userResourceSvc, orgSvc),
NotificationEndpointService: endpoints.NewService(notificationEndpointStore, secretSvc, ts.UrmSvc, ts.OrgSvc),
CheckService: checkSvc,
ScraperTargetStoreService: scraperTargetSvc,
ChronografService: chronografSvc,
@ -1011,17 +1003,17 @@ func (m *Launcher) run(ctx context.Context) (err error) {
{
b := m.apibackend
authedOrgSVC := authorizer.NewOrgService(b.OrganizationService)
authedURMSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
authedUrmSVC := authorizer.NewURMService(b.OrgLookupService, b.UserResourceMappingService)
pkgerLogger := m.log.With(zap.String("service", "pkger"))
pkgSVC = pkger.NewService(
pkger.WithLogger(pkgerLogger),
pkger.WithStore(pkger.NewStoreKV(m.kvStore)),
pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService, b.UserResourceMappingService)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedURMSVC, authedOrgSVC)),
pkger.WithCheckSVC(authorizer.NewCheckService(b.CheckService, authedUrmSVC, authedOrgSVC)),
pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)),
pkger.WithLabelSVC(authorizer.NewLabelServiceWithOrg(b.LabelService, b.OrgLookupService)),
pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedURMSVC, authedOrgSVC)),
pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedURMSVC, authedOrgSVC)),
pkger.WithNotificationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, authedUrmSVC, authedOrgSVC)),
pkger.WithNotificationRuleSVC(authorizer.NewNotificationRuleStore(b.NotificationRuleStore, authedUrmSVC, authedOrgSVC)),
pkger.WithOrganizationService(authorizer.NewOrgService(b.OrganizationService)),
pkger.WithSecretSVC(authorizer.NewSecretService(b.SecretService)),
pkger.WithTaskSVC(authorizer.NewTaskService(pkgerLogger, b.TaskService)),
@ -1052,10 +1044,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
templatesHTTPServer = pkger.NewHTTPServerTemplates(tLogger, pkgSVC)
}
var userHTTPServer *tenant.UserHandler
{
userHTTPServer = tenant.NewHTTPUserHandler(m.log.With(zap.String("handler", "user")), tenant.NewAuthedUserService(userSvc), tenant.NewAuthedPasswordService(passwdsSvc))
}
userHTTPServer := ts.NewUserHTTPHandler(m.log)
var onboardHTTPServer *tenant.OnboardHandler
{
@ -1107,23 +1096,12 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var sessionHTTPServer *session.SessionHandler
{
sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, userSvc, passwdsSvc)
sessionHTTPServer = session.NewSessionHandler(m.log.With(zap.String("handler", "session")), sessionSvc, ts.UserSvc, ts.PasswordSvc)
}
var orgHTTPServer *tenant.OrgHandler
{
secretHandler := secret.NewHandler(m.log, "id", secret.NewAuthedService(secretSvc))
urmHandler := tenant.NewURMHandler(m.log.With(zap.String("handler", "urm")), platform.OrgsResourceType, "id", userSvc, tenant.NewAuthedURMService(orgSvc, userResourceSvc))
labelHandler := label.NewHTTPEmbeddedHandler(m.log.With(zap.String("handler", "label")), platform.OrgsResourceType, labelSvc)
orgHTTPServer = tenant.NewHTTPOrgHandler(m.log.With(zap.String("handler", "org")), tenant.NewAuthedOrgService(orgSvc), urmHandler, labelHandler, secretHandler)
}
orgHTTPServer := ts.NewOrgHTTPHandler(m.log, labelSvc, secret.NewAuthedService(secretSvc))
var bucketHTTPServer *tenant.BucketHandler
{
urmHandler := tenant.NewURMHandler(m.log.With(zap.String("handler", "urm")), platform.OrgsResourceType, "id", userSvc, tenant.NewAuthedURMService(orgSvc, userResourceSvc))
labelHandler := label.NewHTTPEmbeddedHandler(m.log.With(zap.String("handler", "label")), platform.BucketsResourceType, labelSvc)
bucketHTTPServer = tenant.NewHTTPBucketHandler(m.log.With(zap.String("handler", "bucket")), tenant.NewAuthedBucketService(bucketSvc), labelSvc, urmHandler, labelHandler)
}
bucketHTTPServer := ts.NewBucketHTTPHandler(m.log, labelSvc)
{
platformHandler := http.NewPlatformHandler(m.apibackend,

View File

@ -1,6 +1,13 @@
package tenant
import "github.com/influxdata/influxdb/v2"
import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/metric"
"github.com/influxdata/influxdb/v2/label"
"github.com/influxdata/influxdb/v2/secret"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
type Service struct {
store *Store
@ -11,3 +18,39 @@ func NewService(st *Store) influxdb.TenantService {
store: st,
}
}
type TenantSystem struct {
UserSvc influxdb.UserService
PasswordSvc influxdb.PasswordsService
UrmSvc influxdb.UserResourceMappingService
OrgSvc influxdb.OrganizationService
BucketSvc influxdb.BucketService
}
func NewSystem(store *Store, log *zap.Logger, reg prometheus.Registerer, metricOpts ...metric.ClientOptFn) *TenantSystem {
ts := NewService(store)
return &TenantSystem{
UserSvc: NewUserLogger(log, NewUserMetrics(reg, ts, metricOpts...)),
PasswordSvc: NewPasswordLogger(log, NewPasswordMetrics(reg, ts, metricOpts...)),
UrmSvc: NewURMLogger(log, NewUrmMetrics(reg, ts, metricOpts...)),
OrgSvc: NewOrgLogger(log, NewOrgMetrics(reg, ts, metricOpts...)),
BucketSvc: NewBucketLogger(log, NewBucketMetrics(reg, ts, metricOpts...)),
}
}
func (ts *TenantSystem) NewOrgHTTPHandler(log *zap.Logger, labelSvc influxdb.LabelService, secretSvc influxdb.SecretService) *OrgHandler {
secretHandler := secret.NewHandler(log, "id", secret.NewAuthedService(secretSvc))
urmHandler := NewURMHandler(log.With(zap.String("handler", "urm")), influxdb.OrgsResourceType, "id", ts.UserSvc, NewAuthedURMService(ts.OrgSvc, ts.UrmSvc))
labelHandler := label.NewHTTPEmbeddedHandler(log.With(zap.String("handler", "label")), influxdb.OrgsResourceType, labelSvc)
return NewHTTPOrgHandler(log.With(zap.String("handler", "org")), NewAuthedOrgService(ts.OrgSvc), urmHandler, labelHandler, secretHandler)
}
func (ts *TenantSystem) NewBucketHTTPHandler(log *zap.Logger, labelSvc influxdb.LabelService) *BucketHandler {
urmHandler := NewURMHandler(log.With(zap.String("handler", "urm")), influxdb.OrgsResourceType, "id", ts.UserSvc, NewAuthedURMService(ts.OrgSvc, ts.UrmSvc))
labelHandler := label.NewHTTPEmbeddedHandler(log.With(zap.String("handler", "label")), influxdb.BucketsResourceType, labelSvc)
return NewHTTPBucketHandler(log.With(zap.String("handler", "bucket")), NewAuthedBucketService(ts.BucketSvc), labelSvc, urmHandler, labelHandler)
}
func (ts *TenantSystem) NewUserHTTPHandler(log *zap.Logger) *UserHandler {
return NewHTTPUserHandler(log.With(zap.String("handler", "user")), NewAuthedUserService(ts.UserSvc), NewAuthedPasswordService(ts.PasswordSvc))
}