Merge pull request #17385 from influxdata/la/dup-read
feat(tenant): add duplicate read servicepull/17362/head
commit
b9fc4e5668
|
@ -50,11 +50,12 @@ import (
|
|||
"github.com/influxdata/influxdb/task/backend/middleware"
|
||||
"github.com/influxdata/influxdb/task/backend/scheduler"
|
||||
"github.com/influxdata/influxdb/telemetry"
|
||||
"github.com/influxdata/influxdb/tenant"
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsi1" // needed for tsi1
|
||||
_ "github.com/influxdata/influxdb/tsdb/tsm1" // needed for tsm1
|
||||
"github.com/influxdata/influxdb/vault"
|
||||
pzap "github.com/influxdata/influxdb/zap"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/cobra"
|
||||
jaegerconfig "github.com/uber/jaeger-client-go/config"
|
||||
|
@ -260,11 +261,22 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
|
|||
Default: "",
|
||||
Desc: "TLS key for HTTPs",
|
||||
},
|
||||
{
|
||||
DestP: &l.enableNewMetaStore,
|
||||
Flag: "new-meta-store",
|
||||
Default: false,
|
||||
Desc: "enables the new meta store",
|
||||
},
|
||||
{
|
||||
DestP: &l.newMetaStoreReadOnly,
|
||||
Flag: "new-meta-store-read-only",
|
||||
Default: true,
|
||||
Desc: "toggles read-only mode for the new meta store, if so, the reads are duplicated between the old and new store (has meaning only if the new meta store is enabled)",
|
||||
},
|
||||
}
|
||||
|
||||
cli.BindOptions(cmd, opts)
|
||||
cmd.AddCommand(inspect.NewCommand())
|
||||
|
||||
}
|
||||
|
||||
// Launcher represents the main program execution.
|
||||
|
@ -288,7 +300,11 @@ type Launcher struct {
|
|||
enginePath string
|
||||
secretStore string
|
||||
|
||||
enableNewMetaStore bool
|
||||
newMetaStoreReadOnly bool
|
||||
|
||||
boltClient *bolt.Client
|
||||
kvStore kv.Store
|
||||
kvService *kv.Service
|
||||
engine Engine
|
||||
StorageConfig storage.Config
|
||||
|
@ -487,12 +503,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
case BoltStore:
|
||||
store := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), m.boltPath)
|
||||
store.WithDB(m.boltClient.DB())
|
||||
m.kvStore = store
|
||||
m.kvService = kv.NewService(m.log.With(zap.String("store", "kv")), store, serviceConfig)
|
||||
if m.testing {
|
||||
flushers = append(flushers, store)
|
||||
}
|
||||
case MemoryStore:
|
||||
store := inmem.NewKVStore()
|
||||
m.kvStore = store
|
||||
m.kvService = kv.NewService(m.log.With(zap.String("store", "kv")), store, serviceConfig)
|
||||
if m.testing {
|
||||
flushers = append(flushers, store)
|
||||
|
@ -539,6 +557,33 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
notificationEndpointStore platform.NotificationEndpointService = m.kvService
|
||||
)
|
||||
|
||||
if m.enableNewMetaStore {
|
||||
var ts platform.TenantService
|
||||
if m.newMetaStoreReadOnly {
|
||||
store, err := tenant.NewReadOnlyStore(m.kvStore)
|
||||
if err != nil {
|
||||
m.log.Error("Failed creating new meta store", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
oldSvc := m.kvService
|
||||
newSvc := tenant.NewService(store)
|
||||
ts = tenant.NewDuplicateReadTenantService(m.log, oldSvc, newSvc)
|
||||
} else {
|
||||
store, err := tenant.NewStore(m.kvStore)
|
||||
if err != nil {
|
||||
m.log.Error("Failed creating new meta store", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
ts = tenant.NewService(store)
|
||||
}
|
||||
|
||||
bucketSvc = tenant.NewBucketLogger(m.log.With(zap.String("store", "new")), tenant.NewBucketMetrics(m.reg, ts, tenant.WithSuffix("new")))
|
||||
orgSvc = tenant.NewOrgLogger(m.log.With(zap.String("store", "new")), tenant.NewOrgMetrics(m.reg, ts, tenant.WithSuffix("new")))
|
||||
userResourceSvc = tenant.NewURMLogger(m.log.With(zap.String("store", "new")), tenant.NewUrmMetrics(m.reg, ts, tenant.WithSuffix("new")))
|
||||
userSvc = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, tenant.WithSuffix("new")))
|
||||
passwdsSvc = tenant.NewPasswordLogger(m.log.With(zap.String("store", "new")), tenant.NewPasswordMetrics(m.reg, ts, tenant.WithSuffix("new")))
|
||||
}
|
||||
|
||||
switch m.secretStore {
|
||||
case "bolt":
|
||||
// If it is bolt, then we already set it above.
|
||||
|
@ -942,6 +987,11 @@ func (m *Launcher) UserService() platform.UserService {
|
|||
return m.apibackend.UserService
|
||||
}
|
||||
|
||||
// UserResourceMappingService returns the internal user resource mapping service.
|
||||
func (m *Launcher) UserResourceMappingService() platform.UserResourceMappingService {
|
||||
return m.apibackend.UserResourceMappingService
|
||||
}
|
||||
|
||||
// AuthorizationService returns the internal authorization service.
|
||||
func (m *Launcher) AuthorizationService() platform.AuthorizationService {
|
||||
return m.apibackend.AuthorizationService
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
package launcher_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/cmd/influxd/launcher"
|
||||
icontext "github.com/influxdata/influxdb/context"
|
||||
)
|
||||
|
||||
func testTenant(t *testing.T, args ...string) {
|
||||
l := launcher.RunTestLauncherOrFail(t, ctx, args...)
|
||||
l.SetupOrFail(t)
|
||||
defer l.ShutdownOrFail(t, ctx)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = icontext.SetAuthorizer(ctx, l.Auth)
|
||||
|
||||
o := &influxdb.Organization{Name: "a-org"}
|
||||
if err := l.OrganizationService().CreateOrganization(ctx, o); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
u := &influxdb.User{Name: "a-user"}
|
||||
if err := l.UserService().CreateUser(ctx, u); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := l.BucketService(t).CreateBucket(ctx, &influxdb.Bucket{Name: "a-bucket", OrgID: o.ID}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := l.UserResourceMappingService().CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
|
||||
UserID: u.ID,
|
||||
UserType: influxdb.Owner,
|
||||
ResourceType: influxdb.OrgsResourceType,
|
||||
ResourceID: o.ID,
|
||||
}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if _, _, err := l.BucketService(t).FindBuckets(ctx, influxdb.BucketFilter{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, _, err := l.OrganizationService().FindOrganizations(ctx, influxdb.OrganizationFilter{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, _, err := l.UserService().FindUsers(ctx, influxdb.UserFilter{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, _, err := l.UserResourceMappingService().FindUserResourceMappings(ctx, influxdb.UserResourceMappingFilter{}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Tenant(t *testing.T) {
|
||||
t.Run("tenant service", func(t *testing.T) {
|
||||
testTenant(t)
|
||||
})
|
||||
|
||||
t.Run("duplicate read tenant service", func(t *testing.T) {
|
||||
testTenant(t, "--new-meta-store")
|
||||
})
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package kv_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/tenant"
|
||||
influxdbtesting "github.com/influxdata/influxdb/testing"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
|
||||
s, closeInMem, err := NewTestInmemStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initBucketService(s, f, t)
|
||||
ro, err := tenant.NewReadOnlyStore(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeInMem()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadBucketService(t *testing.T) {
|
||||
influxdbtesting.BucketService(initBoltDuplicateReadBucketService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initOrganizationService(s, f, t)
|
||||
ro, err := tenant.NewReadOnlyStore(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadOrganizationService(t *testing.T) {
|
||||
influxdbtesting.OrganizationService(initBoltDuplicateReadOrganizationService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, closeSvc := initUserResourceMappingService(s, f, t)
|
||||
ro, err := tenant.NewReadOnlyStore(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadUserResourceMappingService(t *testing.T) {
|
||||
influxdbtesting.UserResourceMappingService(initBoltDuplicateReadUserResourceMappingService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
|
||||
s, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, op, closeSvc := initUserService(s, f, t)
|
||||
ro, err := tenant.NewReadOnlyStore(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, op, func() {
|
||||
closeSvc()
|
||||
closeBolt()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadUserService(t *testing.T) {
|
||||
influxdbtesting.UserService(initBoltDuplicateReadUserService, t)
|
||||
}
|
||||
|
||||
func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
|
||||
s, closeStore, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
svc, closeSvc := initPasswordsService(s, f, t)
|
||||
ro, err := tenant.NewReadOnlyStore(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
newSvc := tenant.NewService(ro)
|
||||
svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc)
|
||||
return svc, func() {
|
||||
closeSvc()
|
||||
closeStore()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBoltDuplicateReadPasswordService(t *testing.T) {
|
||||
influxdbtesting.PasswordsService(initBoltDuplicateReadPasswordsService, t)
|
||||
}
|
|
@ -0,0 +1,333 @@
|
|||
package tenant
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ influxdb.TenantService = (*tenantService)(nil)
|
||||
|
||||
// readOnlyStore is a wrapper for kv.Store that ensures that updates are not applied.
|
||||
type readOnlyStore struct {
|
||||
kv.Store
|
||||
}
|
||||
|
||||
func (r readOnlyStore) Update(context.Context, func(kv.Tx) error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type tenantService struct {
|
||||
log *zap.Logger
|
||||
|
||||
oldBucketSvc influxdb.BucketService
|
||||
newBucketSvc influxdb.BucketService
|
||||
oldOrgSvc influxdb.OrganizationService
|
||||
newOrgSvc influxdb.OrganizationService
|
||||
oldURMSvc influxdb.UserResourceMappingService
|
||||
newURMSvc influxdb.UserResourceMappingService
|
||||
oldUserSvc influxdb.UserService
|
||||
newUserSvc influxdb.UserService
|
||||
oldPwdSvc influxdb.PasswordsService
|
||||
newPwdSvc influxdb.PasswordsService
|
||||
}
|
||||
|
||||
// NewReadOnlyStore returns a Store that cannot update the underlying kv.Store.
|
||||
func NewReadOnlyStore(store kv.Store) (*Store, error) {
|
||||
return NewStore(readOnlyStore{Store: store})
|
||||
}
|
||||
|
||||
// NewDuplicateReadTenantService returns a tenant service that duplicates the reads to oldSvc and newSvc.
|
||||
// The foreseen use case is to compare two service versions, an old one and a new one.
|
||||
// The resulting influxdb.TenantService:
|
||||
// - forwards writes to the old service;
|
||||
// - reads from the old one, if no error is encountered, it reads from the new one;
|
||||
// - compares the results obtained and logs the difference, if any.
|
||||
func NewDuplicateReadTenantService(log *zap.Logger, oldSvc, newSvc influxdb.TenantService) influxdb.TenantService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldBucketSvc: oldSvc,
|
||||
oldOrgSvc: oldSvc,
|
||||
oldURMSvc: oldSvc,
|
||||
oldUserSvc: oldSvc,
|
||||
oldPwdSvc: oldSvc,
|
||||
|
||||
newBucketSvc: newSvc,
|
||||
newOrgSvc: newSvc,
|
||||
newURMSvc: newSvc,
|
||||
newUserSvc: newSvc,
|
||||
newPwdSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadBucketService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadBucketService(log *zap.Logger, oldSvc, newSvc influxdb.BucketService) influxdb.BucketService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldBucketSvc: oldSvc,
|
||||
newBucketSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadOrganizationService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadOrganizationService(log *zap.Logger, oldSvc, newSvc influxdb.OrganizationService) influxdb.OrganizationService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldOrgSvc: oldSvc,
|
||||
newOrgSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadUserResourceMappingService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadUserResourceMappingService(log *zap.Logger, oldSvc, newSvc influxdb.UserResourceMappingService) influxdb.UserResourceMappingService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldURMSvc: oldSvc,
|
||||
newURMSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadUserService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadUserService(log *zap.Logger, oldSvc, newSvc influxdb.UserService) influxdb.UserService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldUserSvc: oldSvc,
|
||||
newUserSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// NewDuplicateReadPasswordsService returns a service that duplicates the reads to oldSvc and newSvc.
|
||||
func NewDuplicateReadPasswordsService(log *zap.Logger, oldSvc, newSvc influxdb.PasswordsService) influxdb.PasswordsService {
|
||||
return tenantService{
|
||||
log: log,
|
||||
|
||||
oldPwdSvc: oldSvc,
|
||||
newPwdSvc: newSvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucketByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucketByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucketByName(ctx, orgID, name)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucketByName(ctx, orgID, name)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucketByName"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error) {
|
||||
o, err := s.oldBucketSvc.FindBucket(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newBucketSvc.FindBucket(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBucket"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error) {
|
||||
o, no, err := s.oldBucketSvc.FindBuckets(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newBucketSvc.FindBuckets(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindBuckets"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
|
||||
return s.oldBucketSvc.CreateBucket(ctx, b)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) {
|
||||
return s.oldBucketSvc.UpdateBucket(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteBucket(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldBucketSvc.DeleteBucket(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
|
||||
o, err := s.oldOrgSvc.FindOrganizationByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newOrgSvc.FindOrganizationByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizationByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganization(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) {
|
||||
o, err := s.oldOrgSvc.FindOrganization(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newOrgSvc.FindOrganization(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganization"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindOrganizations(ctx context.Context, filter influxdb.OrganizationFilter, opt ...influxdb.FindOptions) ([]*influxdb.Organization, int, error) {
|
||||
o, no, err := s.oldOrgSvc.FindOrganizations(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newOrgSvc.FindOrganizations(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindOrganizations"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateOrganization(ctx context.Context, b *influxdb.Organization) error {
|
||||
return s.oldOrgSvc.CreateOrganization(ctx, b)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateOrganization(ctx context.Context, id influxdb.ID, upd influxdb.OrganizationUpdate) (*influxdb.Organization, error) {
|
||||
return s.oldOrgSvc.UpdateOrganization(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteOrganization(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldOrgSvc.DeleteOrganization(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) FindUserResourceMappings(ctx context.Context, filter influxdb.UserResourceMappingFilter, opt ...influxdb.FindOptions) ([]*influxdb.UserResourceMapping, int, error) {
|
||||
o, no, err := s.oldURMSvc.FindUserResourceMappings(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newURMSvc.FindUserResourceMappings(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserResourceMappings"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateUserResourceMapping(ctx context.Context, m *influxdb.UserResourceMapping) error {
|
||||
return s.oldURMSvc.CreateUserResourceMapping(ctx, m)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteUserResourceMapping(ctx context.Context, resourceID, userID influxdb.ID) error {
|
||||
return s.oldURMSvc.DeleteUserResourceMapping(ctx, resourceID, userID)
|
||||
}
|
||||
|
||||
func (s tenantService) FindUserByID(ctx context.Context, id influxdb.ID) (*influxdb.User, error) {
|
||||
o, err := s.oldUserSvc.FindUserByID(ctx, id)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newUserSvc.FindUserByID(ctx, id)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUserByID"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindUser(ctx context.Context, filter influxdb.UserFilter) (*influxdb.User, error) {
|
||||
o, err := s.oldUserSvc.FindUser(ctx, filter)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
n, err := s.newUserSvc.FindUser(ctx, filter)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUser"))
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
func (s tenantService) FindUsers(ctx context.Context, filter influxdb.UserFilter, opt ...influxdb.FindOptions) ([]*influxdb.User, int, error) {
|
||||
o, no, err := s.oldUserSvc.FindUsers(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
return o, no, err
|
||||
}
|
||||
n, _, err := s.newUserSvc.FindUsers(ctx, filter, opt...)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
} else if diff := cmp.Diff(o, n); diff != "" {
|
||||
s.log.Error(fmt.Sprintf("unexpected read result -old/+new:\n\t%s", diff), zap.String("diff", diff), zap.String("call", "FindUsers"))
|
||||
}
|
||||
return o, no, nil
|
||||
}
|
||||
|
||||
func (s tenantService) CreateUser(ctx context.Context, u *influxdb.User) error {
|
||||
return s.oldUserSvc.CreateUser(ctx, u)
|
||||
}
|
||||
|
||||
func (s tenantService) UpdateUser(ctx context.Context, id influxdb.ID, upd influxdb.UserUpdate) (*influxdb.User, error) {
|
||||
return s.oldUserSvc.UpdateUser(ctx, id, upd)
|
||||
}
|
||||
|
||||
func (s tenantService) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
||||
return s.oldUserSvc.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
func (s tenantService) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
return s.oldPwdSvc.SetPassword(ctx, userID, password)
|
||||
}
|
||||
|
||||
func (s tenantService) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
if err := s.oldPwdSvc.ComparePassword(ctx, userID, password); err != nil {
|
||||
return err
|
||||
}
|
||||
err := s.newPwdSvc.ComparePassword(ctx, userID, password)
|
||||
if err != nil {
|
||||
s.log.Error("error in new meta store", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s tenantService) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error {
|
||||
return s.oldPwdSvc.CompareAndSetPassword(ctx, userID, old, new)
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package tenant
|
||||
|
||||
import "fmt"
|
||||
|
||||
type metricOpts struct {
|
||||
serviceSuffix string
|
||||
}
|
||||
|
||||
func defaultOpts() *metricOpts {
|
||||
return &metricOpts{}
|
||||
}
|
||||
|
||||
func (o *metricOpts) applySuffix(prefix string) string {
|
||||
if o.serviceSuffix != "" {
|
||||
return fmt.Sprintf("%s_%s", prefix, o.serviceSuffix)
|
||||
}
|
||||
return prefix
|
||||
}
|
||||
|
||||
// MetricsOption is an option used by a metric middleware.
|
||||
type MetricsOption func(*metricOpts)
|
||||
|
||||
// WithSuffix returns a metric option that applies a suffix to the service name of the metric.
|
||||
func WithSuffix(suffix string) MetricsOption {
|
||||
return func(opts *metricOpts) {
|
||||
opts.serviceSuffix = suffix
|
||||
}
|
||||
}
|
||||
|
||||
func applyOpts(opts ...MetricsOption) *metricOpts {
|
||||
o := defaultOpts()
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
return o
|
||||
}
|
|
@ -2,7 +2,6 @@ package tenant
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kit/metric"
|
||||
"github.com/influxdata/influxdb/kit/prom"
|
||||
|
@ -18,9 +17,10 @@ type BucketMetrics struct {
|
|||
var _ influxdb.BucketService = (*BucketMetrics)(nil)
|
||||
|
||||
// NewBucketMetrics returns a metrics service middleware for the Bucket Service.
|
||||
func NewBucketMetrics(reg *prom.Registry, s influxdb.BucketService) *BucketMetrics {
|
||||
func NewBucketMetrics(reg *prom.Registry, s influxdb.BucketService, opts ...MetricsOption) *BucketMetrics {
|
||||
o := applyOpts(opts...)
|
||||
return &BucketMetrics{
|
||||
rec: metric.New(reg, "bucket"),
|
||||
rec: metric.New(reg, o.applySuffix("bucket")),
|
||||
bucketService: s,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,10 @@ type OrgMetrics struct {
|
|||
var _ influxdb.OrganizationService = (*OrgMetrics)(nil)
|
||||
|
||||
// NewOrgMetrics returns a metrics service middleware for the Organization Service.
|
||||
func NewOrgMetrics(reg *prom.Registry, s influxdb.OrganizationService) *OrgMetrics {
|
||||
func NewOrgMetrics(reg *prom.Registry, s influxdb.OrganizationService, opts ...MetricsOption) *OrgMetrics {
|
||||
o := applyOpts(opts...)
|
||||
return &OrgMetrics{
|
||||
rec: metric.New(reg, "org"),
|
||||
rec: metric.New(reg, o.applySuffix("org")),
|
||||
orgService: s,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,10 @@ type UrmMetrics struct {
|
|||
var _ influxdb.UserResourceMappingService = (*UrmMetrics)(nil)
|
||||
|
||||
// NewUrmMetrics returns a metrics service middleware for the User Resource Mapping Service.
|
||||
func NewUrmMetrics(reg *prom.Registry, s influxdb.UserResourceMappingService) *UrmMetrics {
|
||||
func NewUrmMetrics(reg *prom.Registry, s influxdb.UserResourceMappingService, opts ...MetricsOption) *UrmMetrics {
|
||||
o := applyOpts(opts...)
|
||||
return &UrmMetrics{
|
||||
rec: metric.New(reg, "urm"),
|
||||
rec: metric.New(reg, o.applySuffix("urm")),
|
||||
urmService: s,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ influxdb.UserService = (*UserLogger)(nil)
|
||||
var _ influxdb.PasswordsService = (*PasswordLogger)(nil)
|
||||
|
||||
type UserLogger struct {
|
||||
logger *zap.Logger
|
||||
userService influxdb.UserService
|
||||
|
@ -22,8 +25,6 @@ func NewUserLogger(log *zap.Logger, s influxdb.UserService) *UserLogger {
|
|||
}
|
||||
}
|
||||
|
||||
var _ influxdb.UserService = (*UserLogger)(nil)
|
||||
|
||||
func (l *UserLogger) CreateUser(ctx context.Context, u *influxdb.User) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
|
@ -97,3 +98,55 @@ func (l *UserLogger) DeleteUser(ctx context.Context, id influxdb.ID) (err error)
|
|||
}(time.Now())
|
||||
return l.userService.DeleteUser(ctx, id)
|
||||
}
|
||||
|
||||
type PasswordLogger struct {
|
||||
logger *zap.Logger
|
||||
pwdService influxdb.PasswordsService
|
||||
}
|
||||
|
||||
// NewPasswordLogger returns a logging service middleware for the Password Service.
|
||||
func NewPasswordLogger(log *zap.Logger, s influxdb.PasswordsService) *PasswordLogger {
|
||||
return &PasswordLogger{
|
||||
logger: log,
|
||||
pwdService: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *PasswordLogger) SetPassword(ctx context.Context, userID influxdb.ID, password string) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to set password for user with ID %v", userID)
|
||||
l.logger.Error(msg, zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Info("set password", dur)
|
||||
}(time.Now())
|
||||
return l.pwdService.SetPassword(ctx, userID, password)
|
||||
}
|
||||
|
||||
func (l *PasswordLogger) ComparePassword(ctx context.Context, userID influxdb.ID, password string) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to compare password for user with ID %v", userID)
|
||||
l.logger.Error(msg, zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Info("compare password", dur)
|
||||
}(time.Now())
|
||||
return l.pwdService.ComparePassword(ctx, userID, password)
|
||||
}
|
||||
|
||||
func (l *PasswordLogger) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) (err error) {
|
||||
defer func(start time.Time) {
|
||||
dur := zap.Duration("took", time.Since(start))
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("failed to compare and set password for user with ID %v", userID)
|
||||
l.logger.Error(msg, zap.Error(err), dur)
|
||||
return
|
||||
}
|
||||
l.logger.Info("compare and set password", dur)
|
||||
}(time.Now())
|
||||
return l.pwdService.CompareAndSetPassword(ctx, userID, old, new)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,9 @@ import (
|
|||
"github.com/influxdata/influxdb/kit/prom"
|
||||
)
|
||||
|
||||
var _ influxdb.UserService = (*UserMetrics)(nil)
|
||||
var _ influxdb.PasswordsService = (*PasswordMetrics)(nil)
|
||||
|
||||
type UserMetrics struct {
|
||||
// RED metrics
|
||||
rec *metric.REDClient
|
||||
|
@ -15,12 +18,11 @@ type UserMetrics struct {
|
|||
userService influxdb.UserService
|
||||
}
|
||||
|
||||
var _ influxdb.UserService = (*UserMetrics)(nil)
|
||||
|
||||
// NewUserMetrics returns a metrics service middleware for the User Service.
|
||||
func NewUserMetrics(reg *prom.Registry, s influxdb.UserService) *UserMetrics {
|
||||
func NewUserMetrics(reg *prom.Registry, s influxdb.UserService, opts ...MetricsOption) *UserMetrics {
|
||||
o := applyOpts(opts...)
|
||||
return &UserMetrics{
|
||||
rec: metric.New(reg, "user"),
|
||||
rec: metric.New(reg, o.applySuffix("user")),
|
||||
userService: s,
|
||||
}
|
||||
}
|
||||
|
@ -60,3 +62,37 @@ func (m *UserMetrics) DeleteUser(ctx context.Context, id influxdb.ID) error {
|
|||
err := m.userService.DeleteUser(ctx, id)
|
||||
return rec(err)
|
||||
}
|
||||
|
||||
type PasswordMetrics struct {
|
||||
// RED metrics
|
||||
rec *metric.REDClient
|
||||
|
||||
pwdService influxdb.PasswordsService
|
||||
}
|
||||
|
||||
// NewPasswordMetrics returns a metrics service middleware for the Password Service.
|
||||
func NewPasswordMetrics(reg *prom.Registry, s influxdb.PasswordsService, opts ...MetricsOption) *PasswordMetrics {
|
||||
o := applyOpts(opts...)
|
||||
return &PasswordMetrics{
|
||||
rec: metric.New(reg, o.applySuffix("password")),
|
||||
pwdService: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *PasswordMetrics) SetPassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
rec := m.rec.Record("set_password")
|
||||
err := m.pwdService.SetPassword(ctx, userID, password)
|
||||
return rec(err)
|
||||
}
|
||||
|
||||
func (m *PasswordMetrics) ComparePassword(ctx context.Context, userID influxdb.ID, password string) error {
|
||||
rec := m.rec.Record("compare_password")
|
||||
err := m.pwdService.ComparePassword(ctx, userID, password)
|
||||
return rec(err)
|
||||
}
|
||||
|
||||
func (m *PasswordMetrics) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old, new string) error {
|
||||
rec := m.rec.Record("compare_and_set_password")
|
||||
err := m.pwdService.CompareAndSetPassword(ctx, userID, old, new)
|
||||
return rec(err)
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package tenant
|
||||
|
||||
import "github.com/influxdata/influxdb"
|
||||
|
||||
type Service struct {
|
||||
store *Store
|
||||
}
|
||||
|
||||
func NewService(st *Store) *Service {
|
||||
func NewService(st *Store) influxdb.TenantService {
|
||||
return &Service{
|
||||
store: st,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue