refactor: migrator and introduce Store.(Create|Delete)Bucket (#18570)

* refactor: migrator and introduce Store.(Create|Delete)Bucket

feat: kvmigration internal utility to create / managing kv store migrations

fix: ensure migrations applied in all test cases

* chore: update kv and migration documentation
pull/18785/head
George 2020-07-01 12:08:20 +01:00 committed by GitHub
parent f9903ddd56
commit 96d84b9126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
148 changed files with 1813 additions and 1624 deletions

View File

@ -19,13 +19,22 @@ import (
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
itesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func NewTestInmemStore(t *testing.T) (kv.Store, func(), error) {
return inmem.NewKVStore(), func() {}, nil
t.Helper()
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
return store, func() {}, nil
}
func TestService_handlePostAuthorization(t *testing.T) {

View File

@ -11,9 +11,11 @@ import (
"github.com/influxdata/influxdb/v2/authorization"
influxdbcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
var authorizationCmpOptions = cmp.Options{
@ -159,15 +161,16 @@ func TestAuthorizationService_ReadAuthorization(t *testing.T) {
}, 1, nil
}
// set up tenant service
ctx := context.Background()
st := inmem.NewKVStore()
store, err := tenant.NewStore(st)
if err != nil {
if err := all.Up(ctx, zaptest.NewLogger(t), st); err != nil {
t.Fatal(err)
}
store := tenant.NewStore(st)
ts := tenant.NewService(store)
s := authorization.NewAuthedAuthorizationService(m, ts)
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
t.Run("find authorization by id", func(t *testing.T) {
@ -304,15 +307,16 @@ func TestAuthorizationService_WriteAuthorization(t *testing.T) {
return nil, nil
}
// set up tenant service
ctx := context.Background()
st := inmem.NewKVStore()
store, err := tenant.NewStore(st)
if err != nil {
if err := all.Up(ctx, zaptest.NewLogger(t), st); err != nil {
t.Fatal(err)
}
store := tenant.NewStore(st)
ts := tenant.NewService(store)
s := authorization.NewAuthedAuthorizationService(m, ts)
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
t.Run("update authorization", func(t *testing.T) {
@ -443,17 +447,18 @@ func TestAuthorizationService_CreateAuthorization(t *testing.T) {
}
// set up tenant service
st := inmem.NewKVStore()
store, err := tenant.NewStore(st)
if err != nil {
ctx := context.Background()
if err := all.Up(ctx, zaptest.NewLogger(t), st); err != nil {
t.Fatal(err)
}
store := tenant.NewStore(st)
ts := tenant.NewService(store)
s := authorization.NewAuthedAuthorizationService(m, ts)
ctx := context.Background()
ctx = influxdbcontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(false, tt.args.permissions))
err = s.CreateAuthorization(ctx, &influxdb.Authorization{OrgID: 1, UserID: 1})
err := s.CreateAuthorization(ctx, &influxdb.Authorization{OrgID: 1, UserID: 1})
influxdbtesting.ErrorsEqual(t, err, tt.wants.err)
})
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/tenant"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
@ -30,10 +31,7 @@ func initBoltAuthService(f influxdbtesting.AuthorizationFields, t *testing.T) (i
}
func initAuthService(s kv.Store, f influxdbtesting.AuthorizationFields, t *testing.T) (influxdb.AuthorizationService, func()) {
st, err := tenant.NewStore(s)
if err != nil {
t.Fatal(err)
}
st := tenant.NewStore(s)
ts := tenant.NewService(st)
storage, err := authorization.NewStore(s)
if err != nil {
@ -76,8 +74,16 @@ func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
f.Close()
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
if err := s.Open(context.Background()); err != nil {
ctx := context.Background()
logger := zaptest.NewLogger(t)
s := bolt.NewKVStore(logger, path)
if err := s.Open(ctx); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}

View File

@ -10,13 +10,11 @@ import (
"github.com/influxdata/influxdb/v2/authorization"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"go.uber.org/zap/zaptest"
)
func TestAuth(t *testing.T) {
s := func() kv.Store {
return inmem.NewKVStore()
}
setup := func(t *testing.T, store *authorization.Store, tx kv.Tx) {
for i := 1; i <= 10; i++ {
err := store.CreateAuthorization(context.Background(), tx, &influxdb.Authorization{
@ -81,7 +79,12 @@ func TestAuth(t *testing.T) {
for _, testScenario := range tt {
t.Run(testScenario.name, func(t *testing.T) {
ts, err := authorization.NewStore(s())
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
ts, err := authorization.NewStore(store)
if err != nil {
t.Fatal(err)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/pkg/errors"
@ -574,9 +575,11 @@ from(bucket:"holder") |> range(start:-5m) |> to(bucket:"holder", org:"thing")`
func newKVSVC(t *testing.T) *kv.Service {
t.Helper()
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
if err := svc.Initialize(context.Background()); err != nil {
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
return svc
return kv.NewService(zaptest.NewLogger(t), store)
}

View File

@ -3,6 +3,7 @@ package bolt
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
@ -15,11 +16,8 @@ import (
"go.uber.org/zap"
)
// check that *KVStore implement kv.Store interface.
var _ kv.Store = (*KVStore)(nil)
// ensure *KVStore implements kv.AutoMigrationStore.
var _ kv.AutoMigrationStore = (*KVStore)(nil)
// check that *KVStore implement kv.SchemaStore interface.
var _ kv.SchemaStore = (*KVStore)(nil)
// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
@ -37,11 +35,6 @@ func NewKVStore(log *zap.Logger, path string) *KVStore {
}
}
// AutoMigrate returns itself as it is safe to automatically apply migrations on initialization.
func (s *KVStore) AutoMigrate() kv.Store {
return s
}
// Open creates boltDB file it doesn't exists and opens it otherwise.
func (s *KVStore) Open(ctx context.Context) error {
span, _ := tracing.StartSpanFromContext(ctx)
@ -133,6 +126,27 @@ func (s *KVStore) Update(ctx context.Context, fn func(tx kv.Tx) error) error {
})
}
// CreateBucket creates a bucket in the underlying boltdb store if it
// does not already exist
func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error {
return s.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(name)
return err
})
}
// DeleteBucket creates a bucket in the underlying boltdb store if it
// does not already exist
func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error {
return s.db.Update(func(tx *bolt.Tx) error {
if err := tx.DeleteBucket(name); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) {
return err
}
return nil
})
}
// Backup copies all K:Vs to a writer, in BoltDB format.
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
span, _ := tracing.StartSpanFromContext(ctx)
@ -160,22 +174,11 @@ func (tx *Tx) WithContext(ctx context.Context) {
tx.ctx = ctx
}
// createBucketIfNotExists creates a bucket with the provided byte slice.
func (tx *Tx) createBucketIfNotExists(b []byte) (*Bucket, error) {
bkt, err := tx.tx.CreateBucketIfNotExists(b)
if err != nil {
return nil, err
}
return &Bucket{
bucket: bkt,
}, nil
}
// Bucket retrieves the bucket named b.
func (tx *Tx) Bucket(b []byte) (kv.Bucket, error) {
bkt := tx.tx.Bucket(b)
if bkt == nil {
return tx.createBucketIfNotExists(b)
return nil, fmt.Errorf("bucket %q: %w", string(b), kv.ErrBucketNotFound)
}
return &Bucket{
bucket: bkt,

View File

@ -2,9 +2,11 @@ package bolt_test
import (
"context"
"fmt"
"testing"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
platformtesting "github.com/influxdata/influxdb/v2/testing"
)
@ -14,6 +16,8 @@ func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func(
t.Fatalf("failed to create new kv store: %v", err)
}
mustCreateBucket(t, s, f.Bucket)
err = s.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket(f.Bucket)
if err != nil {
@ -39,3 +43,13 @@ func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func(
func TestKVStore(t *testing.T) {
platformtesting.KVStore(initKVStore, t)
}
func mustCreateBucket(t testing.TB, store kv.SchemaStore, bucket []byte) {
t.Helper()
migrationName := fmt.Sprintf("create bucket %q", string(bucket))
if err := migration.CreateBuckets(migrationName, bucket).Up(context.Background(), store); err != nil {
t.Fatal(err)
}
}

View File

@ -7,13 +7,22 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
_ "github.com/influxdata/influxdb/v2/query/builtin"
"github.com/influxdata/influxdb/v2/query/fluxlang"
"go.uber.org/zap/zaptest"
)
func NewKVTestStore(t *testing.T) (kv.Store, func()) {
return inmem.NewKVStore(), func() {}
t.Helper()
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
return store, func() {}
}
func TestCheckService(t *testing.T) {
@ -39,9 +48,6 @@ func initCheckService(f CheckFields, t *testing.T) (influxdb.CheckService, influ
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing check service: %v", err)
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)

View File

@ -10,6 +10,8 @@ import (
"github.com/influxdata/influxdb/v2/cmd/influxd/internal/profile"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/pkg/data/gen"
"github.com/spf13/cobra"
"go.uber.org/zap"
@ -91,7 +93,18 @@ func assignOrgBucket(spec *gen.Spec) error {
}
s := kv.NewService(zap.NewNop(), store)
if err = s.Initialize(context.Background()); err != nil {
migrator, err := migration.NewMigrator(
zap.NewNop(),
store,
all.Migrations[:]...,
)
if err != nil {
return err
}
// apply migrations to metadata store
if err := migrator.Up(context.Background()); err != nil {
return err
}

View File

@ -38,6 +38,8 @@ import (
"github.com/influxdata/influxdb/v2/kit/tracing"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/label"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
@ -391,7 +393,7 @@ type Launcher struct {
queueSize int
boltClient *bolt.Client
kvStore kv.Store
kvStore kv.SchemaStore
kvService *kv.Service
engine Engine
StorageConfig storage.Config
@ -617,8 +619,19 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}
if err := m.kvService.Initialize(ctx); err != nil {
m.log.Error("Failed to initialize kv service", zap.Error(err))
migrator, err := migration.NewMigrator(
m.log.With(zap.String("service", "migrations")),
m.kvStore,
all.Migrations[:]...,
)
if err != nil {
m.log.Error("Failed to initialize kv migrator", zap.Error(err))
return err
}
// apply migrations to metadata store
if err := migrator.Up(ctx); err != nil {
m.log.Error("Failed to apply migrations", zap.Error(err))
return err
}
@ -644,12 +657,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
notificationEndpointStore platform.NotificationEndpointService = m.kvService
)
store, err := tenant.NewStore(m.kvStore)
if err != nil {
m.log.Error("Failed creating new meta store", zap.Error(err))
return err
}
ts := tenant.NewService(store)
tenantStore := tenant.NewStore(m.kvStore)
ts := tenant.NewService(tenantStore)
var (
userSvc platform.UserService = tenant.NewUserLogger(m.log.With(zap.String("store", "new")), tenant.NewUserMetrics(m.reg, ts, metric.WithSuffix("new")))
@ -806,11 +815,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}
}
dbrpSvc, err := dbrp.NewService(ctx, authorizer.NewBucketService(bucketSvc, userResourceSvc), m.kvStore)
if err != nil {
return err
}
dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(bucketSvc, userResourceSvc), m.kvStore)
dbrpSvc = dbrp.NewAuthorizedService(dbrpSvc)
var checkSvc platform.CheckService
@ -1044,7 +1049,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var onboardHTTPServer *tenant.OnboardHandler
{
onboardSvc := tenant.NewOnboardService(store, authSvc) // basic service
onboardSvc := tenant.NewOnboardService(tenantStore, authSvc) // basic service
onboardSvc = tenant.NewAuthedOnboardSvc(onboardSvc) // with auth
onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, metric.WithSuffix("new")) // with metrics
onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging
@ -1069,7 +1074,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// feature flagging for new authorization service
var authHTTPServer *kithttp.FeatureHandler
{
ts := tenant.NewService(store) // todo (al): remove when tenant is un-flagged
ts := tenant.NewService(tenantStore) // todo (al): remove when tenant is un-flagged
authLogger := m.log.With(zap.String("handler", "authorization"))
oldBackend := http.NewAuthorizationBackend(authLogger, m.apibackend)

View File

@ -14,7 +14,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
@ -36,14 +35,16 @@ func initHttpService(t *testing.T) (influxdb.DBRPMappingServiceV2, *httptest.Ser
},
}
s := inmem.NewKVStore()
svc, err := dbrp.NewService(ctx, bucketSvc, s)
s, closeS, err := NewTestBoltStore(t)
if err != nil {
t.Fatal(err)
}
svc := dbrp.NewService(ctx, bucketSvc, s)
server := httptest.NewServer(dbrp.NewHTTPHandler(zaptest.NewLogger(t), svc, orgSvc))
return svc, server, func() {
closeS()
server.Close()
}
}

View File

@ -63,21 +63,7 @@ func composeForeignKey(orgID influxdb.ID, db string) []byte {
return key
}
func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Store) (influxdb.DBRPMappingServiceV2, error) {
if err := st.Update(ctx, func(tx kv.Tx) error {
_, err := tx.Bucket(bucket)
if err != nil {
return err
}
_, err = tx.Bucket(indexBucket)
if err != nil {
return err
}
_, err = tx.Bucket(defaultBucket)
return err
}); err != nil {
return nil, err
}
func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Store) influxdb.DBRPMappingServiceV2 {
return &Service{
store: st,
IDGen: snowflake.NewDefaultIDGenerator(),
@ -89,7 +75,7 @@ func NewService(ctx context.Context, bucketSvc influxdb.BucketService, st kv.Sto
}
return indexForeignKey(dbrp), nil
}), kv.WithIndexReadPathEnabled),
}, nil
}
}
// getDefault gets the default mapping ID inside of a transaction.

View File

@ -12,24 +12,33 @@ import (
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
itesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
)
func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
t.Helper()
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
ctx := context.Background()
logger := zaptest.NewLogger(t)
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
s := bolt.NewKVStore(logger, path)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
@ -44,10 +53,6 @@ func initDBRPMappingService(f itesting.DBRPMappingFieldsV2, t *testing.T) (influ
t.Fatalf("failed to create new bolt kv store: %v", err)
}
ks := kv.NewService(zaptest.NewLogger(t), s)
if err := ks.Initialize(context.Background()); err != nil {
t.Fatal(err)
}
if f.BucketSvc == nil {
f.BucketSvc = &mock.BucketService{
FindBucketByIDFn: func(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error) {
@ -59,10 +64,9 @@ func initDBRPMappingService(f itesting.DBRPMappingFieldsV2, t *testing.T) (influ
},
}
}
svc, err := dbrp.NewService(context.Background(), f.BucketSvc, s)
if err != nil {
t.Fatal(err)
}
svc := dbrp.NewService(context.Background(), f.BucketSvc, s)
if err := f.Populate(context.Background(), svc); err != nil {
t.Fatal(err)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb/v2"
pcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -869,7 +868,8 @@ func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing.
t.Skip("HTTP authorization service does not required a user id on the authentication struct. We get the user from the session token.")
}
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
store := NewTestInmemStore(t)
svc := kv.NewService(zaptest.NewLogger(t), store)
svc.IDGenerator = f.IDGenerator
svc.TokenGenerator = f.TokenGenerator
svc.TimeGenerator = f.TimeGenerator

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -1173,7 +1172,10 @@ func TestService_handlePostBucketOwner(t *testing.T) {
}
func initBucketService(f platformtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
ctx := context.Background()
logger := zaptest.NewLogger(t)
store := NewTestInmemStore(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.OrgBucketIDs = f.OrgBucketIDs
svc.TimeGenerator = f.TimeGenerator
@ -1181,11 +1183,6 @@ func initBucketService(f platformtesting.BucketFields, t *testing.T) (influxdb.B
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatal(err)
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")

View File

@ -14,7 +14,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -1915,9 +1914,5 @@ func jsonEqual(s1, s2 string) (eq bool, diff string, err error) {
func newInMemKVSVC(t *testing.T) *kv.Service {
t.Helper()
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
if err := svc.Initialize(context.Background()); err != nil {
t.Fatal(err)
}
return svc
return kv.NewService(zaptest.NewLogger(t), NewTestInmemStore(t))
}

View File

@ -11,7 +11,6 @@ import (
"github.com/influxdata/influxdb/v2/authorizer"
icontext "github.com/influxdata/influxdb/v2/context"
httpmock "github.com/influxdata/influxdb/v2/http/mock"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -19,7 +18,7 @@ import (
"go.uber.org/zap/zaptest"
)
const namespace = "testing"
const namespace = "templates"
type fixture struct {
Org *influxdb.Organization
@ -29,14 +28,13 @@ type fixture struct {
}
func setup(t *testing.T) (func(auth influxdb.Authorizer) *httptest.Server, func(serverUrl string) DocumentService, fixture) {
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
ctx := context.Background()
// Need this to make resource creation work.
// We are not testing authorization in the setup.
ctx = icontext.SetAuthorizer(ctx, mock.NewMockAuthorizer(true, nil))
if err := svc.Initialize(ctx); err != nil {
t.Fatal(err)
}
store := NewTestInmemStore(t)
svc := kv.NewService(zaptest.NewLogger(t), store)
ds, err := svc.CreateDocumentStore(ctx, namespace)
if err != nil {
t.Fatalf("failed to create document store: %v", err)

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/httprouter"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -595,14 +594,11 @@ func TestService_handlePatchLabel(t *testing.T) {
}
func initLabelService(f platformtesting.LabelFields, t *testing.T) (platform.LabelService, string, func()) {
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
store := NewTestInmemStore(t)
svc := kv.NewService(zaptest.NewLogger(t), store)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatal(err)
}
for _, l := range f.Labels {
if err := svc.PutLabel(ctx, l); err != nil {
t.Fatalf("failed to populate labels: %v", err)

View File

@ -14,7 +14,6 @@ import (
"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
pcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -1062,15 +1061,13 @@ func TestService_handlePostNotificationEndpointOwner(t *testing.T) {
}
func initNotificationEndpointService(f influxTesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
ctx := context.Background()
store := NewTestInmemStore(t)
logger := zaptest.NewLogger(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatal(err)
}
for _, v := range f.Orgs {
if err := svc.PutOrganization(ctx, v); err != nil {
t.Fatalf("failed to replace org: %v", err)

View File

@ -6,7 +6,6 @@ import (
"testing"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/mock"
@ -24,7 +23,8 @@ func NewMockSetupBackend(t *testing.T) *SetupBackend {
func initOnboardingService(f platformtesting.OnboardingFields, t *testing.T) (platform.OnboardingService, func()) {
t.Helper()
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
store := NewTestInmemStore(t)
svc := kv.NewService(zaptest.NewLogger(t), store)
svc.IDGenerator = f.IDGenerator
svc.OrgBucketIDs = f.IDGenerator
svc.TokenGenerator = f.TokenGenerator

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/influxdb/v2/inmem"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
@ -35,7 +36,11 @@ func NewMockOrgBackend(t *testing.T) *OrgBackend {
func initOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
t.Helper()
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
ctx := context.Background()
logger := zaptest.NewLogger(t)
store := NewTestInmemStore(t)
svc := kv.NewService(logger, store)
svc.IDGenerator = f.IDGenerator
svc.OrgBucketIDs = f.OrgBucketIDs
svc.TimeGenerator = f.TimeGenerator
@ -43,11 +48,6 @@ func initOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T)
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatal(err)
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")
@ -70,13 +70,16 @@ func initOrganizationService(f influxdbtesting.OrganizationFields, t *testing.T)
func initSecretService(f influxdbtesting.SecretServiceFields, t *testing.T) (influxdb.SecretService, func()) {
t.Helper()
svc := kv.NewService(zaptest.NewLogger(t), inmem.NewKVStore())
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
store := inmem.NewKVStore()
logger := zaptest.NewLogger(t)
if err := all.Up(ctx, logger, store); err != nil {
t.Fatal(err)
}
svc := kv.NewService(logger, store)
for _, ss := range f.Secrets {
if err := svc.PutSecrets(ctx, ss.OrganizationID, ss.Env); err != nil {
t.Fatalf("failed to populate secrets")

23
http/service_test.go Normal file
View File

@ -0,0 +1,23 @@
package http
import (
"context"
"testing"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"go.uber.org/zap/zaptest"
)
func NewTestInmemStore(t *testing.T) kv.Store {
t.Helper()
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
return store
}

View File

@ -11,11 +11,8 @@ import (
"github.com/influxdata/influxdb/v2/kv"
)
// ensure *KVStore implement kv.Store interface
var _ kv.Store = (*KVStore)(nil)
// ensure *KVStore implements kv.AutoMigrationStore
var _ kv.AutoMigrationStore = (*KVStore)(nil)
// ensure *KVStore implement kv.SchemaStore interface
var _ kv.SchemaStore = (*KVStore)(nil)
// cursorBatchSize is the size of a batch sent by a forward cursors
// tree iterator
@ -60,13 +57,35 @@ func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error {
})
}
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
// CreateBucket creates a bucket with the provided name if one
// does not exist.
func (s *KVStore) CreateBucket(ctx context.Context, name []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.buckets[string(name)]
if !ok {
bkt := &Bucket{btree: btree.New(2)}
s.buckets[string(name)] = bkt
s.ro[string(name)] = &bucket{Bucket: bkt}
}
return nil
}
// AutoMigrate returns itlsef as *KVStore is safe to migrate automically on initialize.
func (s *KVStore) AutoMigrate() kv.Store {
return s
// DeleteBucket creates a bucket with the provided name if one
// does not exist.
func (s *KVStore) DeleteBucket(ctx context.Context, name []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.buckets, string(name))
return nil
}
func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
panic("not implemented")
}
// Flush removes all data from the buckets. Used for testing.
@ -108,28 +127,11 @@ func (t *Tx) WithContext(ctx context.Context) {
t.ctx = ctx
}
// createBucketIfNotExists creates a btree bucket at the provided key.
func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) {
if t.writable {
bkt, ok := t.kv.buckets[string(b)]
if !ok {
bkt = &Bucket{btree: btree.New(2)}
t.kv.buckets[string(b)] = bkt
t.kv.ro[string(b)] = &bucket{Bucket: bkt}
return bkt, nil
}
return bkt, nil
}
return nil, kv.ErrTxNotWritable
}
// Bucket retrieves the bucket at the provided key.
func (t *Tx) Bucket(b []byte) (kv.Bucket, error) {
bkt, ok := t.kv.buckets[string(b)]
if !ok {
return t.createBucketIfNotExists(b)
return nil, fmt.Errorf("bucket %q: %w", string(b), kv.ErrBucketNotFound)
}
if t.writable {

View File

@ -3,6 +3,7 @@ package inmem_test
import (
"bufio"
"context"
"fmt"
"math"
"os"
"reflect"
@ -12,12 +13,15 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
platformtesting "github.com/influxdata/influxdb/v2/testing"
)
func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) {
s := inmem.NewKVStore()
mustCreateBucket(t, s, f.Bucket)
err := s.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket(f.Bucket)
if err != nil {
@ -62,17 +66,10 @@ func TestKVStore_Buckets(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := inmem.NewKVStore()
err := s.Update(context.Background(), func(tx kv.Tx) error {
for _, b := range tt.buckets {
if _, err := tx.Bucket([]byte(b)); err != nil {
return err
}
}
return nil
})
if err != nil {
t.Fatalf("unable to setup store with buckets: %v", err)
for _, b := range tt.buckets {
mustCreateBucket(t, s, []byte(b))
}
got := s.Buckets(context.Background())
sort.Slice(got, func(i, j int) bool {
return string(got[i]) < string(got[j])
@ -86,8 +83,9 @@ func TestKVStore_Buckets(t *testing.T) {
func TestKVStore_Bucket_CursorHintPredicate(t *testing.T) {
s := inmem.NewKVStore()
bucket := "urm"
mustCreateBucket(t, s, []byte(bucket))
fillBucket(t, s, bucket, 10)
t.Run("filter by key", func(t *testing.T) {
@ -176,6 +174,7 @@ func BenchmarkKVStore_Bucket_Cursor(b *testing.B) {
b.Run("16000 keys", func(b *testing.B) {
s := inmem.NewKVStore()
bucket := "urm"
mustCreateBucket(b, s, []byte(bucket))
fillBucket(b, s, bucket, 0)
b.Run("without hint", func(b *testing.B) {
@ -236,3 +235,13 @@ func fillBucket(t testing.TB, s *inmem.KVStore, bucket string, lines int64) {
t.Fatalf("unexpected error: %v", err)
}
}
func mustCreateBucket(t testing.TB, store kv.SchemaStore, bucket []byte) {
t.Helper()
migrationName := fmt.Sprintf("create bucket %q", string(bucket))
if err := migration.CreateBuckets(migrationName, bucket).Up(context.Background(), store); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,32 @@
package main
import (
"fmt"
"os"
"strings"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/kv/migration/all"
)
var usageMsg = "Usage: kvmigrate create <migration name / description>"
func usage() {
fmt.Println(usageMsg)
os.Exit(1)
}
func main() {
if len(os.Args) < 3 {
usage()
}
if os.Args[1] != "create" {
fmt.Printf("unrecognized command %q\n", os.Args[1])
usage()
}
if err := migration.CreateNewMigration(all.Migrations[:], strings.Join(os.Args[2:], " ")); err != nil {
panic(err)
}
}

View File

@ -17,16 +17,6 @@ var (
var _ influxdb.AuthorizationService = (*Service)(nil)
func (s *Service) initializeAuths(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(authBucket); err != nil {
return err
}
if _, err := authIndexBucket(tx); err != nil {
return err
}
return nil
}
// FindAuthorizationByID retrieves a authorization by id.
func (s *Service) FindAuthorizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Authorization, error) {
var a *influxdb.Authorization

View File

@ -27,17 +27,13 @@ func initBoltAuthorizationService(f influxdbtesting.AuthorizationFields, t *test
}
}
func initAuthorizationService(s kv.Store, f influxdbtesting.AuthorizationFields, t *testing.T) (influxdb.AuthorizationService, string, func()) {
func initAuthorizationService(s kv.SchemaStore, f influxdbtesting.AuthorizationFields, t *testing.T) (influxdb.AuthorizationService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TokenGenerator = f.TokenGenerator
svc.TimeGenerator = f.TimeGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing authorization service: %v", err)
}
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")

View File

@ -20,16 +20,6 @@ var (
var _ influxdb.BucketService = (*Service)(nil)
var _ influxdb.BucketOperationLogService = (*Service)(nil)
func (s *Service) initializeBuckets(ctx context.Context, tx Tx) error {
if _, err := s.bucketsBucket(tx); err != nil {
return err
}
if _, err := s.bucketsIndexBucket(tx); err != nil {
return err
}
return nil
}
func (s *Service) bucketsBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(bucketBucket)
if err != nil {

View File

@ -27,7 +27,8 @@ func initBoltBucketService(f influxdbtesting.BucketFields, t *testing.T) (influx
}
}
func initBucketService(s kv.Store, f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
func initBucketService(s kv.SchemaStore, f influxdbtesting.BucketFields, t *testing.T) (influxdb.BucketService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.OrgBucketIDs = f.OrgBucketIDs
svc.IDGenerator = f.IDGenerator
@ -36,10 +37,6 @@ func initBucketService(s kv.Store, f influxdbtesting.BucketFields, t *testing.T)
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing bucket service: %v", err)
}
for _, o := range f.Organizations {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate organizations")

View File

@ -10,6 +10,11 @@ import (
var _ influxdb.CheckService = (*Service)(nil)
var (
checkBucket = []byte("checksv1")
checkIndexBucket = []byte("checkindexv1")
)
func newCheckStore() *IndexStore {
const resource = "check"
@ -32,8 +37,8 @@ func newCheckStore() *IndexStore {
return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, []byte("checksv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, []byte("checkindexv1"), false),
EntStore: NewStoreBase(resource, checkBucket, EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, checkIndexBucket, false),
}
}

View File

@ -28,7 +28,8 @@ func initBoltCheckService(f influxdbtesting.CheckFields, t *testing.T) (influxdb
}
}
func initCheckService(s kv.Store, f influxdbtesting.CheckFields, t *testing.T) (*kv.Service, string, func()) {
func initCheckService(s kv.SchemaStore, f influxdbtesting.CheckFields, t *testing.T) (*kv.Service, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
@ -38,10 +39,6 @@ func initCheckService(s kv.Store, f influxdbtesting.CheckFields, t *testing.T) (
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing check service: %v", err)
}
for _, m := range f.UserResourceMappings {
if err := svc.CreateUserResourceMapping(ctx, m); err != nil {
t.Fatalf("failed to populate user resource mapping: %v", err)

View File

@ -33,19 +33,6 @@ const (
var _ influxdb.DashboardService = (*Service)(nil)
var _ influxdb.DashboardOperationLogService = (*Service)(nil)
func (s *Service) initializeDashboards(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(dashboardBucket); err != nil {
return err
}
if _, err := tx.Bucket(orgDashboardIndex); err != nil {
return err
}
if _, err := tx.Bucket(dashboardCellViewBucket); err != nil {
return err
}
return nil
}
// FindDashboardByID retrieves a dashboard by id.
func (s *Service) FindDashboardByID(ctx context.Context, id influxdb.ID) (*influxdb.Dashboard, error) {
var d *influxdb.Dashboard

View File

@ -27,20 +27,17 @@ func initBoltDashboardService(f influxdbtesting.DashboardFields, t *testing.T) (
}
}
func initDashboardService(s kv.Store, f influxdbtesting.DashboardFields, t *testing.T) (influxdb.DashboardService, string, func()) {
func initDashboardService(s kv.SchemaStore, f influxdbtesting.DashboardFields, t *testing.T) (influxdb.DashboardService, string, func()) {
if f.TimeGenerator == nil {
f.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing organization service: %v", err)
}
for _, b := range f.Dashboards {
if err := svc.PutDashboard(ctx, b); err != nil {
t.Fatalf("failed to populate dashboards")

17
kv/doc.go Normal file
View File

@ -0,0 +1,17 @@
// package kv
//
// The KV package is a set of services and abstractions built around key value storage.
// There exist in-memory and persisted implementations of the core `Store` family of
// interfaces outside of this package (see `inmem` and `bolt` packages).
//
// The `Store` interface exposes transactional access to a backing kv persistence layer.
// It allows for read-only (View) and read-write (Update) transactions to be opened.
// These methods take a function which is passed an implementation of the transaction interface (Tx).
// This interface exposes a way to manipulate namespaced keys and values (Buckets).
//
// All keys and values are namespaced (grouped) using buckets. Buckets can only be created on
// implementations of the `SchemaStore` interface. This is a superset of the `Store` interface,
// which has the additional bucket creation and deletion methods.
//
// Bucket creation and deletion should be facilitated via a migration (see `kv/migration`).
package kv

View File

@ -13,14 +13,6 @@ const (
documentMetaBucket = "/documents/meta"
)
func (s *Service) initializeDocuments(ctx context.Context, tx Tx) error {
if _, err := s.createDocumentStore(ctx, tx, "templates"); err != nil {
return err
}
return nil
}
// DocumentStore implements influxdb.DocumentStore.
type DocumentStore struct {
service *Service
@ -30,34 +22,10 @@ type DocumentStore struct {
// CreateDocumentStore creates an instance of a document store by instantiating the buckets for the store.
func (s *Service) CreateDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error) {
// TODO(desa): keep track of which namespaces exist.
var ds influxdb.DocumentStore
err := s.kv.Update(ctx, func(tx Tx) error {
store, err := s.createDocumentStore(ctx, tx, ns)
if err != nil {
return err
}
ds = store
return nil
})
if err != nil {
return nil, err
}
return ds, nil
return s.createDocumentStore(ctx, ns)
}
func (s *Service) createDocumentStore(ctx context.Context, tx Tx, ns string) (influxdb.DocumentStore, error) {
if _, err := tx.Bucket([]byte(path.Join(ns, documentContentBucket))); err != nil {
return nil, err
}
if _, err := tx.Bucket([]byte(path.Join(ns, documentMetaBucket))); err != nil {
return nil, err
}
func (s *Service) createDocumentStore(ctx context.Context, ns string) (influxdb.DocumentStore, error) {
return &DocumentStore{
namespace: ns,
service: s,

View File

@ -15,10 +15,7 @@ func initBoltDuplicateReadBucketService(f influxdbtesting.BucketFields, t *testi
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initBucketService(s, f, t)
ro, err := tenant.NewReadOnlyStore(s)
if err != nil {
t.Fatal(err)
}
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadBucketService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
@ -37,10 +34,7 @@ func initBoltDuplicateReadOrganizationService(f influxdbtesting.OrganizationFiel
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initOrganizationService(s, f, t)
ro, err := tenant.NewReadOnlyStore(s)
if err != nil {
t.Fatal(err)
}
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadOrganizationService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
@ -59,10 +53,7 @@ func initBoltDuplicateReadUserResourceMappingService(f influxdbtesting.UserResou
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initUserResourceMappingService(s, f, t)
ro, err := tenant.NewReadOnlyStore(s)
if err != nil {
t.Fatal(err)
}
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserResourceMappingService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {
@ -81,10 +72,7 @@ func initBoltDuplicateReadUserService(f influxdbtesting.UserFields, t *testing.T
t.Fatalf("failed to create new kv store: %v", err)
}
svc, op, closeSvc := initUserService(s, f, t)
ro, err := tenant.NewReadOnlyStore(s)
if err != nil {
t.Fatal(err)
}
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadUserService(zaptest.NewLogger(t), svc, newSvc)
return svc, op, func() {
@ -103,10 +91,7 @@ func initBoltDuplicateReadPasswordsService(f influxdbtesting.PasswordFields, t *
t.Fatalf("failed to create new kv store: %v", err)
}
svc, closeSvc := initPasswordsService(s, f, t)
ro, err := tenant.NewReadOnlyStore(s)
if err != nil {
t.Fatal(err)
}
ro := tenant.NewReadOnlyStore(s)
newSvc := tenant.NewService(ro)
svc = tenant.NewDuplicateReadPasswordsService(zaptest.NewLogger(t), svc, newSvc)
return svc, func() {

View File

@ -4,11 +4,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
)
const (
defaultPopulateBatchSize = 100
)
// Index is used to define and manage an index for a source bucket.
@ -50,17 +45,12 @@ const (
// return nil
// })
//
// // populate entire index from source
// indexedCount, err := indexByUser.Populate(ctx, store)
//
// // verify the current index against the source and return the differences
// // found in each
// diff, err := indexByUser.Verify(ctx, tx)
type Index struct {
IndexMapping
// populateBatchSize configures the size of the batch used for insertion
populateBatchSize int
// canRead configures whether or not Walk accesses the index at all
// or skips the index altogether and returns nothing.
// This is used when you want to integrate only the write path before
@ -78,14 +68,6 @@ func WithIndexReadPathEnabled(i *Index) {
i.canRead = true
}
// WithIndexPopulateBatchSize configures the size of each batch
// used when fully populating an index. (number of puts per tx)
func WithIndexPopulateBatchSize(n int) IndexOption {
return func(i *Index) {
i.populateBatchSize = n
}
}
// IndexMapping is a type which configures and Index to map items
// from a source bucket to an index bucket via a mapping known as
// IndexSourceOn. This function is called on the values in the source
@ -129,10 +111,7 @@ func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) Ind
// be fully populated before depending upon the read path.
// The read path can be enabled using WithIndexReadPathEnabled option.
func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
index := &Index{
IndexMapping: mapping,
populateBatchSize: defaultPopulateBatchSize,
}
index := &Index{IndexMapping: mapping}
for _, opt := range opts {
opt(index)
@ -141,14 +120,6 @@ func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
return index
}
func (i *Index) initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
// create bucket if not exist
_, err := tx.Bucket(i.IndexBucket())
return err
})
}
func (i *Index) indexBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.IndexBucket())
}
@ -179,56 +150,6 @@ func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
return
}
// ensure IndexMigration implements MigrationSpec
var _ MigrationSpec = (*IndexMigration)(nil)
// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
*Index
opts []PopulateOption
}
// Name returns a readable name for the index migration.
func (i *IndexMigration) MigrationName() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}
// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
}
return fmt.Errorf("migration (up) %s: %w", i.MigrationName(), err)
}
if err = i.initialize(ctx, store); err != nil {
return wrapErr(err)
}
_, err = i.Populate(ctx, store, i.opts...)
return wrapErr(err)
}
// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
if err := i.DeleteAll(ctx, store); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.MigrationName(), err)
}
return nil
}
// Migration creates an IndexMigration for the underlying Index.
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
return &IndexMigration{
Index: i,
opts: opts,
}
}
// Insert creates a single index entry for the provided primary key on the foreign key.
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
bkt, err := i.indexBucket(tx)
@ -277,159 +198,37 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi
return indexWalk(ctx, cursor, sourceBucket, visitFn)
}
// PopulateConfig configures a call to Populate
type PopulateConfig struct {
RemoveDanglingForeignKeys bool
}
// PopulateOption is a functional option for the Populate call
type PopulateOption func(*PopulateConfig)
// WithPopulateRemoveDanglingForeignKeys removes index entries which point to
// missing items in the source bucket.
func WithPopulateRemoveDanglingForeignKeys(c *PopulateConfig) {
c.RemoveDanglingForeignKeys = true
}
// Populate does a full population of the index using the IndexSourceOn IndexMapping function.
// Once completed it marks the index as ready for use.
// It return a nil error on success and the count of inserted items.
func (i *Index) Populate(ctx context.Context, store Store, opts ...PopulateOption) (n int, err error) {
var config PopulateConfig
for _, opt := range opts {
opt(&config)
// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
}
// verify the index to derive missing index
// we can skip missing source lookup as we're
// only interested in populating the missing index
diff, err := i.verify(ctx, store, config.RemoveDanglingForeignKeys)
if err != nil {
return 0, fmt.Errorf("looking up missing indexes: %w", err)
if err := indexCursor.Err(); err != nil {
return err
}
flush := func(batch kvSlice) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, pair := range batch {
// insert missing item into index
if err := indexBucket.Put(pair[0], pair[1]); err != nil {
return err
}
n++
}
return nil
}); err != nil {
return fmt.Errorf("updating index: %w", err)
}
return nil
if err := indexCursor.Close(); err != nil {
return err
}
var batch kvSlice
for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return n, err
}
batch = batch[:0]
}
}
}
if err := flush(batch); err != nil {
return n, err
}
if config.RemoveDanglingForeignKeys {
return n, i.remove(ctx, store, diff.MissingFromSource)
}
return n, nil
}
// DeleteAll removes the entire index in batches
func (i *Index) DeleteAll(ctx context.Context, store Store) error {
diff, err := i.verify(ctx, store, true)
values, err := sourceBucket.GetBatch(keys...)
if err != nil {
return err
}
for k, v := range diff.MissingFromSource {
if fkm, ok := diff.PresentInIndex[k]; ok {
for pk := range v {
fkm[pk] = struct{}{}
}
continue
}
diff.PresentInIndex[k] = v
}
return i.remove(ctx, store, diff.PresentInIndex)
}
func (i *Index) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error {
var (
batch [][]byte
flush = func(batch [][]byte) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := i.indexBucket(tx)
if err != nil {
return err
}
for _, indexKey := range batch {
// delete dangling foreign key
if err := indexBucket.Delete(indexKey); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("removing dangling foreign keys: %w", err)
}
return nil
}
)
for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
if len(batch) >= i.populateBatchSize {
if err := flush(batch); err != nil {
return err
}
batch = batch[:0]
for i, value := range values {
if value != nil {
if err := visit(keys[i], value); err != nil {
return err
}
}
}
return flush(batch)
return nil
}
// IndexDiff contains a set of items present in the source not in index,
@ -491,16 +290,20 @@ func (i *IndexDiff) Corrupt() (corrupt []string) {
// The difference contains items in the source that are not in the index
// and vice-versa.
func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) {
return i.verify(ctx, store, true)
return indexVerify(ctx, i, store, true)
}
func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bool) (diff IndexDiff, err error) {
diff.PresentInIndex, err = i.readEntireIndex(ctx, store)
func indexVerify(ctx context.Context, mapping IndexMapping, store Store, includeMissingSource bool) (diff IndexDiff, err error) {
diff.PresentInIndex, err = indexReadAll(ctx, store, func(tx Tx) (Bucket, error) {
return tx.Bucket(mapping.IndexBucket())
})
if err != nil {
return diff, err
}
sourceKVs, err := consumeBucket(ctx, store, i.sourceBucket)
sourceKVs, err := consumeBucket(ctx, store, func(tx Tx) (Bucket, error) {
return tx.Bucket(mapping.SourceBucket())
})
if err != nil {
return diff, err
}
@ -517,7 +320,7 @@ func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bo
pks[string(pk)] = struct{}{}
}
fk, err := i.IndexSourceOn(v)
fk, err := mapping.IndexSourceOn(v)
if err != nil {
return diff, err
}
@ -546,42 +349,9 @@ func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bo
return
}
// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
}
if err := indexCursor.Err(); err != nil {
return err
}
if err := indexCursor.Close(); err != nil {
return err
}
values, err := sourceBucket.GetBatch(keys...)
if err != nil {
return err
}
for i, value := range values {
if value != nil {
if err := visit(keys[i], value); err != nil {
return err
}
}
}
return nil
}
// readEntireIndex returns the entire current state of the index
func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) {
kvs, err := consumeBucket(ctx, store, i.indexBucket)
// indexReadAll returns the entire current state of the index
func indexReadAll(ctx context.Context, store Store, indexBucket func(Tx) (Bucket, error)) (map[string]map[string]struct{}, error) {
kvs, err := consumeBucket(ctx, store, indexBucket)
if err != nil {
return nil, err
}

199
kv/index_migration.go Normal file
View File

@ -0,0 +1,199 @@
package kv
import (
"context"
"fmt"
)
const (
// defaultIndexMigrationOpBatchSize configures the size of batch operations
// done by the index migration when populating or removing items from an
// entire index
defaultIndexMigrationOpBatchSize = 100
)
// IndexMigration is a migration for adding and removing an index.
// These are constructed via the Index.Migration function.
type IndexMigration struct {
IndexMapping
operationBatchSize int
removeDanglingForeignKeys bool
}
// IndexMigrationOption is a functional option for the IndexMigration type
type IndexMigrationOption func(*IndexMigration)
// WithIndexMigationBatchSize configures the size of the batches when committing
// changes to entire index during migration (e.g. size of put batch on index populate).
func WithIndexMigationBatchSize(n int) IndexMigrationOption {
return func(m *IndexMigration) {
m.operationBatchSize = n
}
}
// WithIndexMigrationCleanup removes index entries which point to
// missing items in the source bucket.
func WithIndexMigrationCleanup(m *IndexMigration) {
m.removeDanglingForeignKeys = true
}
// NewIndexMigration construct a migration for creating and populating an index
func NewIndexMigration(mapping IndexMapping, opts ...IndexMigrationOption) *IndexMigration {
m := &IndexMigration{
IndexMapping: mapping,
operationBatchSize: defaultIndexMigrationOpBatchSize,
}
for _, opt := range opts {
opt(m)
}
return m
}
// Name returns a readable name for the index migration.
func (i *IndexMigration) MigrationName() string {
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
}
// Up initializes the index bucket and populates the index.
func (i *IndexMigration) Up(ctx context.Context, store SchemaStore) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
}
return fmt.Errorf("migration (up) %s: %w", i.MigrationName(), err)
}
if err = store.CreateBucket(ctx, i.IndexBucket()); err != nil {
return wrapErr(err)
}
_, err = i.Populate(ctx, store)
return wrapErr(err)
}
// Down deletes all entries from the index.
func (i *IndexMigration) Down(ctx context.Context, store SchemaStore) error {
if err := store.DeleteBucket(ctx, i.IndexBucket()); err != nil {
return fmt.Errorf("migration (down) %s: %w", i.MigrationName(), err)
}
return nil
}
// Populate does a full population of the index using the IndexSourceOn IndexMapping function.
// Once completed it marks the index as ready for use.
// It return a nil error on success and the count of inserted items.
func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err error) {
// verify the index to derive missing index
// we can skip missing source lookup as we're
// only interested in populating the missing index
diff, err := indexVerify(ctx, i, store, i.removeDanglingForeignKeys)
if err != nil {
return 0, fmt.Errorf("looking up missing indexes: %w", err)
}
flush := func(batch kvSlice) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := tx.Bucket(i.IndexBucket())
if err != nil {
return err
}
for _, pair := range batch {
// insert missing item into index
if err := indexBucket.Put(pair[0], pair[1]); err != nil {
return err
}
n++
}
return nil
}); err != nil {
return fmt.Errorf("updating index: %w", err)
}
return nil
}
var batch kvSlice
for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
return n, err
}
batch = batch[:0]
}
}
}
if err := flush(batch); err != nil {
return n, err
}
if i.removeDanglingForeignKeys {
return n, i.remove(ctx, store, diff.MissingFromSource)
}
return n, nil
}
func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error {
var (
batch [][]byte
flush = func(batch [][]byte) error {
if len(batch) == 0 {
return nil
}
if err := store.Update(ctx, func(tx Tx) error {
indexBucket, err := tx.Bucket(i.IndexBucket())
if err != nil {
return err
}
for _, indexKey := range batch {
// delete dangling foreign key
if err := indexBucket.Delete(indexKey); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("removing dangling foreign keys: %w", err)
}
return nil
}
)
for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
return err
}
batch = batch[:0]
}
}
}
return flush(batch)
}

View File

@ -14,7 +14,13 @@ import (
)
func Test_Inmem_Index(t *testing.T) {
influxdbtesting.TestIndex(t, inmem.NewKVStore())
s, closeStore, err := NewTestInmemStore(t)
if err != nil {
t.Fatal(err)
}
defer closeStore()
influxdbtesting.TestIndex(t, s)
}
func Test_Bolt_Index(t *testing.T) {

99
kv/initial_migration.go Normal file
View File

@ -0,0 +1,99 @@
package kv
import (
"context"
"encoding/json"
influxdb "github.com/influxdata/influxdb/v2"
)
type InitialMigration struct{}
// MigrationName returns the string initial migration
// which allows this store to be used as a migration
func (m InitialMigration) MigrationName() string {
return "initial migration"
}
// Up initializes all the owned buckets of the underlying store
func (m InitialMigration) Up(ctx context.Context, store SchemaStore) error {
// please do not initialize anymore buckets here
// add them as a new migration to the list of migrations
// defined in NewInitialMigration.
for _, bucket := range [][]byte{
authBucket,
authIndex,
bucketBucket,
bucketIndex,
dashboardBucket,
orgDashboardIndex,
dashboardCellViewBucket,
kvlogBucket,
kvlogIndex,
labelBucket,
labelMappingBucket,
labelIndex,
onboardingBucket,
organizationBucket,
organizationIndex,
taskBucket,
taskRunBucket,
taskIndexBucket,
userpasswordBucket,
scrapersBucket,
secretBucket,
sessionBucket,
telegrafBucket,
telegrafPluginsBucket,
urmBucket,
notificationRuleBucket,
userBucket,
userIndex,
sourceBucket,
// these are the "document" (aka templates) key prefixes
[]byte("templates/documents/content"),
[]byte("templates/documents/meta"),
// store base backed services
checkBucket,
checkIndexBucket,
notificationEndpointBucket,
notificationEndpointIndexBucket,
variableBucket,
variableIndexBucket,
variableOrgsIndex,
} {
if err := store.CreateBucket(ctx, bucket); err != nil {
return err
}
}
// seed initial sources (default source)
return store.Update(ctx, func(tx Tx) error {
return putAsJson(tx, sourceBucket, DefaultSource.ID, DefaultSource)
})
}
// Down is a no operation required for service to be used as a migration
func (m InitialMigration) Down(ctx context.Context, store SchemaStore) error {
return nil
}
func putAsJson(tx Tx, bucket []byte, id influxdb.ID, value interface{}) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
encodedID, err := id.Encode()
if err != nil {
return err
}
b, err := tx.Bucket(bucket)
if err != nil {
return err
}
return b.Put(encodedID, data)
}

View File

@ -10,22 +10,29 @@ import (
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"go.uber.org/zap/zaptest"
)
func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
func NewTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
ctx := context.Background()
logger := zaptest.NewLogger(t)
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
s := bolt.NewKVStore(logger, path)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
@ -34,6 +41,10 @@ func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
return s, close, nil
}
func NewTestInmemStore(t *testing.T) (kv.Store, func(), error) {
return inmem.NewKVStore(), func() {}, nil
func NewTestInmemStore(t *testing.T) (kv.SchemaStore, func(), error) {
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
return nil, nil, err
}
return store, func() {}, nil
}

View File

@ -98,16 +98,6 @@ func encodeKeyValueIndexKey(k []byte) []byte {
return h.Sum(nil)
}
func (s *Service) initializeKVLog(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(kvlogBucket); err != nil {
return err
}
if _, err := tx.Bucket(kvlogIndex); err != nil {
return err
}
return nil
}
var errKeyValueLogBoundsNotFound = &platform.Error{
Code: platform.ENotFound,
Msg: "oplog not found",

View File

@ -27,13 +27,9 @@ func initBoltKeyValueLog(f influxdbtesting.KeyValueLogFields, t *testing.T) (inf
}
}
func initKeyValueLog(s kv.Store, f influxdbtesting.KeyValueLogFields, t *testing.T) (influxdb.KeyValueLog, func()) {
svc := kv.NewService(zaptest.NewLogger(t), s)
func initKeyValueLog(s kv.SchemaStore, f influxdbtesting.KeyValueLogFields, t *testing.T) (influxdb.KeyValueLog, func()) {
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing organization service: %v", err)
}
svc := kv.NewService(zaptest.NewLogger(t), s)
for _, e := range f.LogEntries {
if err := svc.AddLogEntry(ctx, e.Key, e.Value, e.Time); err != nil {

View File

@ -16,22 +16,6 @@ var (
labelIndex = []byte("labelindexv1")
)
func (s *Service) initializeLabels(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(labelBucket); err != nil {
return err
}
if _, err := tx.Bucket(labelMappingBucket); err != nil {
return err
}
if _, err := tx.Bucket(labelIndex); err != nil {
return err
}
return nil
}
// FindLabelByID finds a label by its ID
func (s *Service) FindLabelByID(ctx context.Context, id influxdb.ID) (*influxdb.Label, error) {
var l *influxdb.Label

View File

@ -27,14 +27,11 @@ func initBoltLabelService(f influxdbtesting.LabelFields, t *testing.T) (influxdb
}
}
func initLabelService(s kv.Store, f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
func initLabelService(s kv.SchemaStore, f influxdbtesting.LabelFields, t *testing.T) (influxdb.LabelService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing label service: %v", err)
}
for _, l := range f.Labels {
if err := svc.PutLabel(ctx, l); err != nil {
t.Fatalf("failed to populate labels: %v", err)

View File

@ -17,7 +17,7 @@ var (
nonexistantID = influxdb.ID(10001)
)
type StoreFn func(*testing.T) (kv.Store, func(), error)
type StoreFn func(*testing.T) (kv.SchemaStore, func(), error)
func TestLookupService_Name_WithBolt(t *testing.T) {
testLookupName(NewTestBoltStore, t)

View File

@ -0,0 +1,6 @@
package all
import "github.com/influxdata/influxdb/v2/kv"
// Migration0001_InitialMigration contains all the buckets created before the time of migrations in kv
var Migration0001_InitialMigration = kv.InitialMigration{}

View File

@ -0,0 +1,6 @@
package all
import "github.com/influxdata/influxdb/v2/kv"
// Migration0002_AddURMByUserIndex creates the URM by user index and populates missing entries based on the source.
var Migration0002_AddURMByUserIndex = kv.NewIndexMigration(kv.URMByUserIndexMapping, kv.WithIndexMigrationCleanup)

View File

@ -0,0 +1,212 @@
package all
import (
"context"
"encoding/json"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)
var taskBucket = []byte("tasksv1")
// Migration0003_TaskOwnerIDUpMigration adds missing owner IDs to some legacy tasks
var Migration0003_TaskOwnerIDUpMigration = UpOnlyMigration(
"migration task owner id",
func(ctx context.Context, store kv.SchemaStore) error {
var ownerlessTasks []*influxdb.Task
// loop through the tasks and collect a set of tasks that are missing the owner id.
err := store.View(ctx, func(tx kv.Tx) error {
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
c, err := taskBucket.ForwardCursor([]byte{})
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if !t.OwnerID.Valid() {
ownerlessTasks = append(ownerlessTasks, t)
}
}
if err := c.Err(); err != nil {
return err
}
return c.Close()
})
if err != nil {
return err
}
// loop through tasks
for _, t := range ownerlessTasks {
// open transaction
err := store.Update(ctx, func(tx kv.Tx) error {
taskKey, err := taskKey(t.ID)
if err != nil {
return err
}
b, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
if !t.OwnerID.Valid() {
v, err := b.Get(taskKey)
if kv.IsNotFound(err) {
return influxdb.ErrTaskNotFound
}
authType := struct {
AuthorizationID influxdb.ID `json:"authorizationID"`
}{}
if err := json.Unmarshal(v, &authType); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
// try populating the owner from auth
encodedID, err := authType.AuthorizationID.Encode()
if err == nil {
authBucket, err := tx.Bucket([]byte("authorizationsv1"))
if err != nil {
return err
}
a, err := authBucket.Get(encodedID)
if err == nil {
auth := &influxdb.Authorization{}
if err := json.Unmarshal(a, auth); err != nil {
return err
}
t.OwnerID = auth.GetUserID()
}
}
}
// try populating owner from urm
if !t.OwnerID.Valid() {
b, err := tx.Bucket([]byte("userresourcemappingsv1"))
if err != nil {
return err
}
id, err := t.OrganizationID.Encode()
if err != nil {
return err
}
cur, err := b.ForwardCursor(id, kv.WithCursorPrefix(id))
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return err
}
if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner {
t.OwnerID = m.UserID
break
}
}
if err := cur.Close(); err != nil {
return err
}
}
// if population fails return error
if !t.OwnerID.Valid() {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "could not populate owner ID for task",
}
}
// save task
taskBytes, err := json.Marshal(t)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
err = b.Put(taskKey, taskBytes)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
})
if err != nil {
return err
}
}
return nil
},
)
type kvTask struct {
ID influxdb.ID `json:"id"`
Type string `json:"type,omitempty"`
OrganizationID influxdb.ID `json:"orgID"`
Organization string `json:"org"`
OwnerID influxdb.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
Offset influxdb.Duration `json:"offset,omitempty"`
LatestCompleted time.Time `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func kvToInfluxTask(k *kvTask) *influxdb.Task {
return &influxdb.Task{
ID: k.ID,
Type: k.Type,
OrganizationID: k.OrganizationID,
Organization: k.Organization,
OwnerID: k.OwnerID,
Name: k.Name,
Description: k.Description,
Status: k.Status,
Flux: k.Flux,
Every: k.Every,
Cron: k.Cron,
LastRunStatus: k.LastRunStatus,
LastRunError: k.LastRunError,
Offset: k.Offset.Duration,
LatestCompleted: k.LatestCompleted,
LatestScheduled: k.LatestScheduled,
CreatedAt: k.CreatedAt,
UpdatedAt: k.UpdatedAt,
Metadata: k.Metadata,
}
}
func taskKey(taskID influxdb.ID) ([]byte, error) {
encodedID, err := taskID.Encode()
if err != nil {
return nil, influxdb.ErrInvalidTaskID
}
return encodedID, nil
}

View File

@ -0,0 +1,147 @@
package all
import (
"context"
"fmt"
"testing"
"github.com/benbjohnson/clock"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"go.uber.org/zap/zaptest"
)
func Test_(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ts := newService(t, ctx)
taskBucket := []byte("tasksv1")
id := "05da585043e02000"
// create a task that has auth set and no ownerID
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket(taskBucket)
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"05d3ae3492c9c000","org":"whos","authorizationID":"%s","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Auth.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = Migration0003_TaskOwnerIDUpMigration.Up(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
idType, _ := influxdb.IDFromString(id)
task, err := ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
// create a task that has no auth or owner id but a urm exists
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"%s","org":"whos","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Org.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = Migration0003_TaskOwnerIDUpMigration.Up(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
task, err = ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
}
type testService struct {
Store kv.SchemaStore
Service *kv.Service
Org influxdb.Organization
User influxdb.User
Auth influxdb.Authorization
Clock clock.Clock
}
func newService(t *testing.T, ctx context.Context) *testService {
t.Helper()
var (
ts = &testService{
Store: inmem.NewKVStore(),
}
logger = zaptest.NewLogger(t)
)
// apply migrations up to (but not including) this one
migrator, err := migration.NewMigrator(logger, ts.Store, Migrations[:2]...)
if err != nil {
t.Fatal(err)
}
if err := migrator.Up(ctx); err != nil {
t.Fatal(err)
}
ts.Service = kv.NewService(logger, ts.Store)
ts.User = influxdb.User{Name: t.Name() + "-user"}
if err := ts.Service.CreateUser(ctx, &ts.User); err != nil {
t.Fatal(err)
}
ts.Org = influxdb.Organization{Name: t.Name() + "-org"}
if err := ts.Service.CreateOrganization(ctx, &ts.Org); err != nil {
t.Fatal(err)
}
if err := ts.Service.CreateUserResourceMapping(ctx, &influxdb.UserResourceMapping{
ResourceType: influxdb.OrgsResourceType,
ResourceID: ts.Org.ID,
UserID: ts.User.ID,
UserType: influxdb.Owner,
}); err != nil {
t.Fatal(err)
}
ts.Auth = influxdb.Authorization{
OrgID: ts.Org.ID,
UserID: ts.User.ID,
Permissions: influxdb.OperPermissions(),
}
if err := ts.Service.CreateAuthorization(context.Background(), &ts.Auth); err != nil {
t.Fatal(err)
}
return ts
}

View File

@ -0,0 +1,17 @@
package all
import "github.com/influxdata/influxdb/v2/kv/migration"
var (
dbrpBucket = []byte("dbrpv1")
dbrpIndexBucket = []byte("dbrpbyorganddbindexv1")
dbrpDefaultBucket = []byte("dbrpdefaultv1")
)
// Migration0004_AddDbrpBuckets creates the buckets necessary for the DBRP Service to operate.
var Migration0004_AddDbrpBuckets = migration.CreateBuckets(
"create DBRP buckets",
dbrpBucket,
dbrpIndexBucket,
dbrpDefaultBucket,
)

View File

@ -0,0 +1,15 @@
package all
import "github.com/influxdata/influxdb/v2/kv/migration"
var (
pkgerStacksBucket = []byte("v1_pkger_stacks")
pkgerStackIndexBucket = []byte("v1_pkger_stacks_index")
)
// Migration0005_AddPkgerBuckets creates the buckets necessary for the pkger service to operate.
var Migration0005_AddPkgerBuckets = migration.CreateBuckets(
"create pkger stacks buckets",
pkgerStacksBucket,
pkgerStackIndexBucket,
)

21
kv/migration/all/all.go Normal file
View File

@ -0,0 +1,21 @@
package all
import (
"github.com/influxdata/influxdb/v2/kv/migration"
)
// Migrations contains all the migrations required for the entire of the
// kv store backing influxdb's metadata.
var Migrations = [...]migration.Spec{
// initial migrations
Migration0001_InitialMigration,
// add index user resource mappings by user id
Migration0002_AddURMByUserIndex,
// add index for tasks with missing owner IDs
Migration0003_TaskOwnerIDUpMigration,
// add dbrp buckets
Migration0004_AddDbrpBuckets,
// add pkger buckets
Migration0005_AddPkgerBuckets,
// {{ do_not_edit . }}
}

22
kv/migration/all/doc.go Normal file
View File

@ -0,0 +1,22 @@
// package all
//
// This package is the canonical location for all migrations being made against the
// single shared kv.Store implementation used by InfluxDB (while it remains a single store).
//
// The array all.Migrations contains the list of migration specifications which drives the
// serial set of migration operations required to correctly configure the backing metadata store
// for InfluxDB.
//
// This package is arranged like so:
//
// doc.go - this piece of documentation.
// all.go - definition of Migration array referencing each of the name migrations in number migration files (below).
// migration.go - an implementation of migration.Spec for convenience.
// 000X_migration_name.go (example) - N files contains the specific implementations of each migration enumerated in `all.go`.
// ...
//
// Managing this list of files and all.go can be fiddly.
// There is a buildable cli utility called `kvmigrate` in the `internal/cmd/kvmigrate` package.
// This has a command `create` which automatically creates a new migration in the expected location
// and appends it appropriately into the all.go Migration array.
package all

View File

@ -0,0 +1,55 @@
package all
import (
"context"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"go.uber.org/zap"
)
// Up is a convenience methods which creates a migrator for all
// migrations and calls Up on it.
func Up(ctx context.Context, logger *zap.Logger, store kv.SchemaStore) error {
migrator, err := migration.NewMigrator(logger, store, Migrations[:]...)
if err != nil {
return err
}
return migrator.Up(ctx)
}
// MigrationFunc is a function which can be used as either an up or down operation.
type MigrationFunc func(context.Context, kv.SchemaStore) error
func noopMigration(context.Context, kv.SchemaStore) error {
return nil
}
// Migration is a type which implements the migration packages Spec interface
// It can be used to conveniently create migration specs for the all package
type Migration struct {
name string
up MigrationFunc
down MigrationFunc
}
// UpOnlyMigration is a migration with an up function and a noop down function
func UpOnlyMigration(name string, up MigrationFunc) *Migration {
return &Migration{name, up, noopMigration}
}
// MigrationName returns the underlying name of the migation
func (m *Migration) MigrationName() string {
return m.name
}
// Up delegates to the underlying anonymous up migration function
func (m *Migration) Up(ctx context.Context, store kv.SchemaStore) error {
return m.up(ctx, store)
}
// Down delegates to the underlying anonymous down migration function
func (m *Migration) Down(ctx context.Context, store kv.SchemaStore) error {
return m.down(ctx, store)
}

47
kv/migration/buckets.go Normal file
View File

@ -0,0 +1,47 @@
package migration
import (
"context"
"github.com/influxdata/influxdb/v2/kv"
)
// CreateBucketsMigration is a migration Spec which creates
// the provided list of buckets on a store when Up is called
// and deletes them on Down.
type CreateBucketsMigration struct {
name string
buckets [][]byte
}
// CreateBuckets returns a new CreateBucketsMigration Spec.
func CreateBuckets(name string, buckets ...[]byte) Spec {
return CreateBucketsMigration{name, buckets}
}
// MigrationName returns the name of the migration.
func (c CreateBucketsMigration) MigrationName() string {
return c.name
}
// Up creates the buckets on the store.
func (c CreateBucketsMigration) Up(ctx context.Context, store kv.SchemaStore) error {
for _, bucket := range c.buckets {
if err := store.CreateBucket(ctx, bucket); err != nil {
return err
}
}
return nil
}
// Down delets the buckets on the store.
func (c CreateBucketsMigration) Down(ctx context.Context, store kv.SchemaStore) error {
for _, bucket := range c.buckets {
if err := store.DeleteBucket(ctx, bucket); err != nil {
return err
}
}
return nil
}

74
kv/migration/create.go Normal file
View File

@ -0,0 +1,74 @@
package migration
import (
"bytes"
"fmt"
"go/format"
"html/template"
"io/ioutil"
"strings"
)
const newMigrationFmt = `package all
var %s = &Migration{}
`
// CreateNewMigration persists a new migration file in the appropriate location
// and updates the appropriate all.go list of migrations
func CreateNewMigration(existing []Spec, name string) error {
camelName := strings.Replace(strings.Title(name), " ", "", -1)
newMigrationNumber := len(existing) + 1
newMigrationVariable := fmt.Sprintf("Migration%04d_%s", newMigrationNumber, camelName)
newMigrationFile := fmt.Sprintf("./kv/migration/all/%04d_%s.go", newMigrationNumber, strings.Replace(name, " ", "-", -1))
fmt.Println("Creating new migration:", newMigrationFile)
if err := ioutil.WriteFile(newMigrationFile, []byte(fmt.Sprintf(newMigrationFmt, newMigrationVariable)), 0644); err != nil {
return err
}
fmt.Println("Inserting migration into ./kv/migration/all/all.go")
tmplData, err := ioutil.ReadFile("./kv/migration/all/all.go")
if err != nil {
return err
}
type Context struct {
Name string
Variable string
}
tmpl := template.Must(
template.
New("migrations").
Funcs(template.FuncMap{"do_not_edit": func(c Context) string {
return fmt.Sprintf("%s\n%s,\n// {{ do_not_edit . }}", c.Name, c.Variable)
}}).
Parse(string(tmplData)),
)
buf := new(bytes.Buffer)
if err := tmpl.Execute(buf, Context{
Name: name,
Variable: newMigrationVariable,
}); err != nil {
return err
}
src, err := format.Source(buf.Bytes())
if err != nil {
return err
}
if err := ioutil.WriteFile("./kv/migration/all/all.go", src, 0644); err != nil {
return err
}
return nil
}

28
kv/migration/doc.go Normal file
View File

@ -0,0 +1,28 @@
// package migration
//
// This package contains utility types for composing and running schema and data migrations
// in a strictly serial and ordered nature; against a backing kv.SchemaStore implementation.
//
// The goal is provide a mechanism to ensure an ordered set of changes are applied once
// and only once to a persisted kv store. To ensure we can make guarantees from one migration
// to the next, based on the mutations of the previous migrations.
//
// The package offers the `Migrator` type which takes a slice of `Spec` implementations.
// A spec is a single migration definition, which exposes a name, up and down operations
// expressed as an Up and Down function on the Spec implementation.
//
// The `Migrator` on a call to `Up(ctx)` applies these defined list of migrations respective `Up(...)` functions
// on a `kv.SchemaStore` in order and persists their invocation on the store in a reserved Bucket `migrationsv1`.
// This is to ensure the only once invocation of the migration takes place and allows to the resuming or introduction
// of new migrations at a later date.
// This means the defined list needs to remain static from the point of application. Otherwise an error will be raised.
//
// This package also offer utilities types for quickly defining common changes as specifications.
// For example creating buckets, when can be quickly constructed via `migration.CreateBuckets("create buckets ...", []byte("foo"), []byte{"bar"})`.
//
// As of today all migrations be found in a single defintion in the sub-package to this one
// named `all` (see `kv/migration/all/all.go`).
// The `migration.CreateNewMigration()` method can be used to manipulate this `all.go` file in the package and quickly
// add a new migration file to be populated. This is accessible on the command line via the `internal/cmd/kvmigrate` buildable go tool.
// Try `go run internal/cmd/kvmigrate/main.go`.
package migration

View File

@ -1,4 +1,4 @@
package kv
package migration
import (
"context"
@ -8,6 +8,7 @@ import (
"time"
influxdb "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"go.uber.org/zap"
)
@ -19,6 +20,8 @@ var (
ErrMigrationSpecNotFound = errors.New("migration specification not found")
)
type Store = kv.SchemaStore
// MigrationState is a type for describing the state of a migration.
type MigrationState uint
@ -50,84 +53,62 @@ type Migration struct {
FinishedAt *time.Time `json:"finished_at,omitempty"`
}
// MigrationSpec is a specification for a particular migration.
// Spec is a specification for a particular migration.
// It describes the name of the migration and up and down operations
// needed to fulfill the migration.
type MigrationSpec interface {
type Spec interface {
MigrationName() string
Up(ctx context.Context, store Store) error
Down(ctx context.Context, store Store) error
Up(ctx context.Context, store kv.SchemaStore) error
Down(ctx context.Context, store kv.SchemaStore) error
}
// MigrationFunc is a function which can be used as either an up or down operation.
type MigrationFunc func(context.Context, Store) error
// AnonymousMigration is a utility type for creating migrations from anonyomous functions.
type AnonymousMigration struct {
name string
up MigrationFunc
down MigrationFunc
}
// NewAnonymousMigration constructs a new migration from a name string and an up and a down function.
func NewAnonymousMigration(name string, up, down MigrationFunc) AnonymousMigration {
return AnonymousMigration{name, up, down}
}
// Name returns the name of the migration.
func (a AnonymousMigration) MigrationName() string { return a.name }
// Up calls the underlying up migration func.
func (a AnonymousMigration) Up(ctx context.Context, store Store) error { return a.up(ctx, store) }
// Down calls the underlying down migration func.
func (a AnonymousMigration) Down(ctx context.Context, store Store) error { return a.down(ctx, store) }
// Migrator is a type which manages migrations.
// It takes a list of migration specifications and undo (down) all or apply (up) outstanding migrations.
// It records the state of the world in store under the migrations bucket.
type Migrator struct {
logger *zap.Logger
MigrationSpecs []MigrationSpec
logger *zap.Logger
store Store
Specs []Spec
now func() time.Time
}
// NewMigrator constructs and configures a new Migrator.
func NewMigrator(logger *zap.Logger, ms ...MigrationSpec) *Migrator {
func NewMigrator(logger *zap.Logger, store Store, ms ...Spec) (*Migrator, error) {
m := &Migrator{
logger: logger,
store: store,
now: func() time.Time {
return time.Now().UTC()
},
}
// create migration bucket if it does not exist
if err := store.CreateBucket(context.Background(), migrationBucket); err != nil {
return nil, err
}
m.AddMigrations(ms...)
return m
return m, nil
}
// AddMigrations appends the provided migration specs onto the Migrator.
func (m *Migrator) AddMigrations(ms ...MigrationSpec) {
m.MigrationSpecs = append(m.MigrationSpecs, ms...)
}
// Initialize creates the migration bucket if it does not yet exist.
func (m *Migrator) Initialize(ctx context.Context, store Store) error {
return store.Update(ctx, func(tx Tx) error {
_, err := tx.Bucket(migrationBucket)
return err
})
func (m *Migrator) AddMigrations(ms ...Spec) {
m.Specs = append(m.Specs, ms...)
}
// List returns a list of migrations and their states within the provided store.
func (m *Migrator) List(ctx context.Context, store Store) (migrations []Migration, _ error) {
if err := m.walk(ctx, store, func(id influxdb.ID, m Migration) {
func (m *Migrator) List(ctx context.Context) (migrations []Migration, _ error) {
if err := m.walk(ctx, m.store, func(id influxdb.ID, m Migration) {
migrations = append(migrations, m)
}); err != nil {
return nil, err
}
migrationsLen := len(migrations)
for idx, spec := range m.MigrationSpecs[migrationsLen:] {
for idx, spec := range m.Specs[migrationsLen:] {
migration := Migration{
ID: influxdb.ID(migrationsLen + idx + 1),
Name: spec.MigrationName(),
@ -148,7 +129,7 @@ func (m *Migrator) List(ctx context.Context, store Store) (migrations []Migratio
// 0003 add index "foo on baz" | (down)
//
// Up would apply migration 0002 and then 0003.
func (m *Migrator) Up(ctx context.Context, store Store) error {
func (m *Migrator) Up(ctx context.Context) error {
wrapErr := func(err error) error {
if err == nil {
return nil
@ -158,7 +139,7 @@ func (m *Migrator) Up(ctx context.Context, store Store) error {
}
var lastMigration int
if err := m.walk(ctx, store, func(id influxdb.ID, mig Migration) {
if err := m.walk(ctx, m.store, func(id influxdb.ID, mig Migration) {
// we're interested in the last up migration
if mig.State == UpMigrationState {
lastMigration = int(id)
@ -167,7 +148,7 @@ func (m *Migrator) Up(ctx context.Context, store Store) error {
return wrapErr(err)
}
for idx, spec := range m.MigrationSpecs[lastMigration:] {
for idx, spec := range m.Specs[lastMigration:] {
startedAt := m.now()
migration := Migration{
ID: influxdb.ID(lastMigration + idx + 1),
@ -177,11 +158,11 @@ func (m *Migrator) Up(ctx context.Context, store Store) error {
m.logMigrationEvent(UpMigrationState, migration, "started")
if err := m.putMigration(ctx, store, migration); err != nil {
if err := m.putMigration(ctx, m.store, migration); err != nil {
return wrapErr(err)
}
if err := spec.Up(ctx, store); err != nil {
if err := spec.Up(ctx, m.store); err != nil {
return wrapErr(err)
}
@ -189,7 +170,7 @@ func (m *Migrator) Up(ctx context.Context, store Store) error {
migration.FinishedAt = &finishedAt
migration.State = UpMigrationState
if err := m.putMigration(ctx, store, migration); err != nil {
if err := m.putMigration(ctx, m.store, migration); err != nil {
return wrapErr(err)
}
@ -208,7 +189,7 @@ func (m *Migrator) Up(ctx context.Context, store Store) error {
// 0003 add index "foo on baz" | (down)
//
// Down would call down() on 0002 and then on 0001.
func (m *Migrator) Down(ctx context.Context, store Store) (err error) {
func (m *Migrator) Down(ctx context.Context) (err error) {
wrapErr := func(err error) error {
if err == nil {
return nil
@ -218,18 +199,18 @@ func (m *Migrator) Down(ctx context.Context, store Store) (err error) {
}
var migrations []struct {
MigrationSpec
Spec
Migration
}
if err := m.walk(ctx, store, func(id influxdb.ID, mig Migration) {
if err := m.walk(ctx, m.store, func(id influxdb.ID, mig Migration) {
migrations = append(
migrations,
struct {
MigrationSpec
Spec
Migration
}{
m.MigrationSpecs[int(id)-1],
m.Specs[int(id)-1],
mig,
},
)
@ -242,11 +223,11 @@ func (m *Migrator) Down(ctx context.Context, store Store) (err error) {
m.logMigrationEvent(DownMigrationState, migration.Migration, "started")
if err := migration.MigrationSpec.Down(ctx, store); err != nil {
if err := migration.Spec.Down(ctx, m.store); err != nil {
return wrapErr(err)
}
if err := m.deleteMigration(ctx, store, migration.Migration); err != nil {
if err := m.deleteMigration(ctx, m.store, migration.Migration); err != nil {
return wrapErr(err)
}
@ -260,8 +241,8 @@ func (m *Migrator) logMigrationEvent(state MigrationState, mig Migration, event
m.logger.Info(fmt.Sprintf("Migration %q %s (%s)", mig.Name, event, state))
}
func (m *Migrator) walk(ctx context.Context, store Store, fn func(id influxdb.ID, m Migration)) error {
if err := store.View(ctx, func(tx Tx) error {
func (m *Migrator) walk(ctx context.Context, store kv.Store, fn func(id influxdb.ID, m Migration)) error {
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(migrationBucket)
if err != nil {
return err
@ -272,7 +253,7 @@ func (m *Migrator) walk(ctx context.Context, store Store, fn func(id influxdb.ID
return err
}
return WalkCursor(ctx, cursor, func(k, v []byte) error {
return kv.WalkCursor(ctx, cursor, func(k, v []byte) error {
var id influxdb.ID
if err := id.Decode(k); err != nil {
return fmt.Errorf("decoding migration id: %w", err)
@ -284,11 +265,11 @@ func (m *Migrator) walk(ctx context.Context, store Store, fn func(id influxdb.ID
}
idx := int(id) - 1
if idx >= len(m.MigrationSpecs) {
if idx >= len(m.Specs) {
return fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
}
if spec := m.MigrationSpecs[idx]; spec.MigrationName() != migration.Name {
if spec := m.Specs[idx]; spec.MigrationName() != migration.Name {
return fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
}
@ -307,31 +288,31 @@ func (m *Migrator) walk(ctx context.Context, store Store, fn func(id influxdb.ID
return nil
}
func (*Migrator) putMigration(ctx context.Context, store Store, m Migration) error {
return store.Update(ctx, func(tx Tx) error {
func (m *Migrator) putMigration(ctx context.Context, store kv.Store, migration Migration) error {
return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(migrationBucket)
if err != nil {
return err
}
data, err := json.Marshal(m)
data, err := json.Marshal(migration)
if err != nil {
return err
}
id, _ := m.ID.Encode()
id, _ := migration.ID.Encode()
return bkt.Put(id, data)
})
}
func (*Migrator) deleteMigration(ctx context.Context, store Store, m Migration) error {
return store.Update(ctx, func(tx Tx) error {
func (m *Migrator) deleteMigration(ctx context.Context, store kv.Store, migration Migration) error {
return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(migrationBucket)
if err != nil {
return err
}
id, _ := m.ID.Encode()
id, _ := migration.ID.Encode()
return bkt.Delete(id)
})
}

View File

@ -1,4 +1,4 @@
package kv
package migration
import (
"time"

View File

@ -0,0 +1,62 @@
package migration_test
import (
"context"
"errors"
"io/ioutil"
"os"
"testing"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
func newMigrator(t *testing.T, logger *zap.Logger, store kv.SchemaStore, now influxdbtesting.NowFunc) *migration.Migrator {
migrator, err := migration.NewMigrator(logger, store)
if err != nil {
t.Fatal(err)
}
migration.MigratorSetNow(migrator, now)
return migrator
}
func Test_Inmem_Migrator(t *testing.T) {
influxdbtesting.Migrator(t, inmem.NewKVStore(), newMigrator)
}
func Test_Bolt_Migrator(t *testing.T) {
store, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
defer closeBolt()
influxdbtesting.Migrator(t, store, newMigrator)
}
func NewTestBoltStore(t *testing.T) (kv.SchemaStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
if err := s.Open(context.Background()); err != nil {
return nil, nil, err
}
close := func() {
s.Close()
os.Remove(path)
}
return s, close, nil
}

View File

@ -1,30 +0,0 @@
package kv_test
import (
"testing"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap"
)
func newMigrator(logger *zap.Logger, now influxdbtesting.NowFunc) *kv.Migrator {
migrator := kv.NewMigrator(logger)
kv.MigratorSetNow(migrator, now)
return migrator
}
func Test_Inmem_Migrator(t *testing.T) {
influxdbtesting.Migrator(t, inmem.NewKVStore(), newMigrator)
}
func Test_Bolt_Migrator(t *testing.T) {
store, closeBolt, err := NewTestBoltStore(t)
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
defer closeBolt()
influxdbtesting.Migrator(t, store, newMigrator)
}

View File

@ -14,6 +14,9 @@ var (
Msg: "notification endpoint not found",
Code: influxdb.ENotFound,
}
notificationEndpointBucket = []byte("notificationEndpointv1")
notificationEndpointIndexBucket = []byte("notificationEndpointIndexv1")
)
var _ influxdb.NotificationEndpointService = (*Service)(nil)
@ -40,8 +43,8 @@ func newEndpointStore() *IndexStore {
return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, []byte("notificationEndpointv1"), EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, []byte("notificationEndpointIndexv1"), true),
EntStore: NewStoreBase(resource, notificationEndpointBucket, EncIDKey, EncBodyJSON, decEndpointEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, notificationEndpointIndexBucket, true),
}
}

View File

@ -28,7 +28,8 @@ func initBoltNotificationEndpointService(f influxdbtesting.NotificationEndpointF
}
}
func initNotificationEndpointService(s kv.Store, f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
func initNotificationEndpointService(s kv.SchemaStore, f influxdbtesting.NotificationEndpointFields, t *testing.T) (influxdb.NotificationEndpointService, influxdb.SecretService, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
@ -36,11 +37,6 @@ func initNotificationEndpointService(s kv.Store, f influxdbtesting.NotificationE
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, edp := range f.NotificationEndpoints {
if err := svc.PutNotificationEndpoint(ctx, edp); err != nil {
t.Fatalf("failed to populate notification endpoint: %v", err)

View File

@ -30,13 +30,6 @@ var (
var _ influxdb.NotificationRuleStore = (*Service)(nil)
func (s *Service) initializeNotificationRule(ctx context.Context, tx Tx) error {
if _, err := s.notificationRuleBucket(tx); err != nil {
return err
}
return nil
}
// UnavailableNotificationRuleStoreError is used if we aren't able to interact with the
// store, it means the store is not available at the moment (e.g. network).
func UnavailableNotificationRuleStoreError(err error) *influxdb.Error {

View File

@ -28,7 +28,8 @@ func initBoltNotificationRuleStore(f influxdbtesting.NotificationRuleFields, t *
}
}
func initNotificationRuleStore(s kv.Store, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) {
func initNotificationRuleStore(s kv.SchemaStore, f influxdbtesting.NotificationRuleFields, t *testing.T) (influxdb.NotificationRuleStore, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
@ -38,11 +39,6 @@ func initNotificationRuleStore(s kv.Store, f influxdbtesting.NotificationRuleFie
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, o := range f.Orgs {
if err := svc.PutOrganization(ctx, o); err != nil {
t.Fatalf("failed to populate org: %v", err)

View File

@ -16,11 +16,6 @@ var (
var _ influxdb.OnboardingService = (*Service)(nil)
func (s *Service) initializeOnboarding(ctx context.Context, tx Tx) error {
_, err := tx.Bucket(onboardingBucket)
return err
}
// IsOnboarding means if the initial setup of influxdb has happened.
// true means that the onboarding setup has not yet happened.
// false means that the onboarding has been completed.

View File

@ -27,7 +27,8 @@ func initBoltOnboardingService(f influxdbtesting.OnboardingFields, t *testing.T)
}
}
func initOnboardingService(s kv.Store, f influxdbtesting.OnboardingFields, t *testing.T) (influxdb.OnboardingService, func()) {
func initOnboardingService(s kv.SchemaStore, f influxdbtesting.OnboardingFields, t *testing.T) (influxdb.OnboardingService, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.OrgBucketIDs = f.IDGenerator
@ -37,11 +38,6 @@ func initOnboardingService(s kv.Store, f influxdbtesting.OnboardingFields, t *te
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("unable to initialize kv store: %v", err)
}
t.Logf("Onboarding: %v", f.IsOnboarding)
if err := svc.PutOnboardingStatus(ctx, !f.IsOnboarding); err != nil {
t.Fatalf("failed to set new onboarding finished: %v", err)

View File

@ -38,16 +38,6 @@ var ErrFailureGeneratingID = &influxdb.Error{
var _ influxdb.OrganizationService = (*Service)(nil)
var _ influxdb.OrganizationOperationLogService = (*Service)(nil)
func (s *Service) initializeOrgs(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(organizationBucket); err != nil {
return err
}
if _, err := tx.Bucket(organizationIndex); err != nil {
return err
}
return nil
}
// FindOrganizationByID retrieves a organization by id.
func (s *Service) FindOrganizationByID(ctx context.Context, id influxdb.ID) (*influxdb.Organization, error) {
var o *influxdb.Organization

View File

@ -27,7 +27,8 @@ func initBoltOrganizationService(f influxdbtesting.OrganizationFields, t *testin
}
}
func initOrganizationService(s kv.Store, f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
func initOrganizationService(s kv.SchemaStore, f influxdbtesting.OrganizationFields, t *testing.T) (influxdb.OrganizationService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.OrgBucketIDs = f.OrgBucketIDs
svc.IDGenerator = f.IDGenerator
@ -36,11 +37,6 @@ func initOrganizationService(s kv.Store, f influxdbtesting.OrganizationFields, t
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing organization service: %v", err)
}
for _, u := range f.Organizations {
if err := svc.PutOrganization(ctx, u); err != nil {
t.Fatalf("failed to populate organizations")

View File

@ -72,11 +72,6 @@ var (
var _ influxdb.PasswordsService = (*Service)(nil)
func (s *Service) initializePasswords(ctx context.Context, tx Tx) error {
_, err := tx.Bucket(userpasswordBucket)
return err
}
// CompareAndSetPassword checks the password and if they match
// updates to the new password.
func (s *Service) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, old string, new string) error {

View File

@ -30,15 +30,11 @@ func initBoltPasswordsService(f influxdbtesting.PasswordFields, t *testing.T) (i
}
}
func initPasswordsService(s kv.Store, f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
func initPasswordsService(s kv.SchemaStore, f influxdbtesting.PasswordFields, t *testing.T) (influxdb.PasswordsService, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing authorization service: %v", err)
}
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {

View File

@ -81,11 +81,6 @@ var (
var _ influxdb.ScraperTargetStoreService = (*Service)(nil)
func (s *Service) initializeScraperTargets(ctx context.Context, tx Tx) error {
_, err := s.scrapersBucket(tx)
return err
}
func (s *Service) scrapersBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket([]byte(scrapersBucket))
if err != nil {

View File

@ -27,15 +27,11 @@ func initBoltTargetService(f influxdbtesting.TargetFields, t *testing.T) (influx
}
}
func initScraperTargetStoreService(s kv.Store, f influxdbtesting.TargetFields, t *testing.T) (influxdb.ScraperTargetStoreService, string, func()) {
func initScraperTargetStoreService(s kv.SchemaStore, f influxdbtesting.TargetFields, t *testing.T) (influxdb.ScraperTargetStoreService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, target := range f.Targets {
if err := svc.PutTarget(ctx, target); err != nil {
t.Fatalf("failed to populate targets: %v", err)

View File

@ -10,17 +10,10 @@ import (
var (
secretBucket = []byte("secretsv1")
_ influxdb.SecretService = (*Service)(nil)
)
var _ influxdb.SecretService = (*Service)(nil)
func (s *Service) initializeSecrets(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(secretBucket); err != nil {
return err
}
return nil
}
// LoadSecret retrieves the secret value v found at key k for organization orgID.
func (s *Service) LoadSecret(ctx context.Context, orgID influxdb.ID, k string) (string, error) {
var v string

View File

@ -27,12 +27,9 @@ func initBoltSecretService(f influxdbtesting.SecretServiceFields, t *testing.T)
}
}
func initSecretService(s kv.Store, f influxdbtesting.SecretServiceFields, t *testing.T) (influxdb.SecretService, func()) {
svc := kv.NewService(zaptest.NewLogger(t), s)
func initSecretService(s kv.SchemaStore, f influxdbtesting.SecretServiceFields, t *testing.T) (influxdb.SecretService, func()) {
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing secret service: %v", err)
}
svc := kv.NewService(zaptest.NewLogger(t), s)
for _, s := range f.Secrets {
for k, v := range s.Env {

View File

@ -2,7 +2,6 @@ package kv
import (
"context"
"encoding/json"
"time"
"github.com/benbjohnson/clock"
@ -49,8 +48,6 @@ type Service struct {
endpointStore *IndexStore
variableStore *IndexStore
Migrator *Migrator
urmByUserIndex *Index
disableAuthorizationsForMaxPermissions func(context.Context) bool
@ -71,50 +68,12 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
checkStore: newCheckStore(),
endpointStore: newEndpointStore(),
variableStore: newVariableStore(),
Migrator: NewMigrator(log),
urmByUserIndex: NewIndex(NewIndexMapping(
urmBucket,
urmByUserIndexBucket,
func(v []byte) ([]byte, error) {
var urm influxdb.UserResourceMapping
if err := json.Unmarshal(v, &urm); err != nil {
return nil, err
}
id, _ := urm.UserID.Encode()
return id, nil
},
), WithIndexReadPathEnabled),
urmByUserIndex: NewIndex(URMByUserIndexMapping, WithIndexReadPathEnabled),
disableAuthorizationsForMaxPermissions: func(context.Context) bool {
return false
},
}
// kv service migrations
s.Migrator.AddMigrations(
// initial migration is the state of the world when
// the migrator was introduced.
NewAnonymousMigration(
"initial migration",
s.initializeAll,
// down is a noop
func(context.Context, Store) error {
return nil
},
),
// add index user resource mappings by user id
s.urmByUserIndex.Migration(),
NewAnonymousMigration(
"migrate task owner id",
s.TaskOwnerIDUpMigration,
func(context.Context, Store) error {
return nil
},
),
// and new migrations below here (and move this comment down):
)
if len(configs) > 0 {
s.Config = configs[0]
}
@ -134,135 +93,9 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
// ServiceConfig allows us to configure Services
type ServiceConfig struct {
SessionLength time.Duration
Clock clock.Clock
URMByUserIndexReadPathEnabled bool
FluxLanguageService influxdb.FluxLanguageService
}
// AutoMigrationStore is a Store which also describes whether or not
// migrations can be applied automatically.
// Given the AutoMigrate method is defined and it returns a non-nil kv.Store
// implementation, then it will automatically invoke migrator.Up(store)
// on the returned kv.Store during Service.Initialize(...).
type AutoMigrationStore interface {
Store
AutoMigrate() Store
}
// Initialize creates Buckets needed.
func (s *Service) Initialize(ctx context.Context) error {
if err := s.Migrator.Initialize(ctx, s.kv); err != nil {
return err
}
// if store implements auto migrate and the resulting Store from
// AutoMigrate() is non-nil, apply migrator.Up() to the resulting store.
if store, ok := s.kv.(AutoMigrationStore); ok {
if migrateStore := store.AutoMigrate(); migrateStore != nil {
return s.Migrator.Up(ctx, migrateStore)
}
}
return nil
}
func (s *Service) initializeAll(ctx context.Context, store Store) error {
// please do not initialize anymore buckets here
// add them as a new migration to the list of migrations
// defined in NewService.
if err := store.Update(ctx, func(tx Tx) error {
if err := s.initializeAuths(ctx, tx); err != nil {
return err
}
if err := s.initializeDocuments(ctx, tx); err != nil {
return err
}
if err := s.initializeBuckets(ctx, tx); err != nil {
return err
}
if err := s.initializeDashboards(ctx, tx); err != nil {
return err
}
if err := s.initializeKVLog(ctx, tx); err != nil {
return err
}
if err := s.initializeLabels(ctx, tx); err != nil {
return err
}
if err := s.initializeOnboarding(ctx, tx); err != nil {
return err
}
if err := s.initializeOrgs(ctx, tx); err != nil {
return err
}
if err := s.initializeTasks(ctx, tx); err != nil {
return err
}
if err := s.initializePasswords(ctx, tx); err != nil {
return err
}
if err := s.initializeScraperTargets(ctx, tx); err != nil {
return err
}
if err := s.initializeSecrets(ctx, tx); err != nil {
return err
}
if err := s.initializeSessions(ctx, tx); err != nil {
return err
}
if err := s.initializeSources(ctx, tx); err != nil {
return err
}
if err := s.initializeTelegraf(ctx, tx); err != nil {
return err
}
if err := s.initializeURMs(ctx, tx); err != nil {
return err
}
if err := s.variableStore.Init(ctx, tx); err != nil {
return err
}
if err := s.initializeVariablesOrgIndex(tx); err != nil {
return err
}
if err := s.checkStore.Init(ctx, tx); err != nil {
return err
}
if err := s.initializeNotificationRule(ctx, tx); err != nil {
return err
}
if err := s.endpointStore.Init(ctx, tx); err != nil {
return err
}
return s.initializeUsers(ctx, tx)
}); err != nil {
return err
}
return nil
SessionLength time.Duration
Clock clock.Clock
FluxLanguageService influxdb.FluxLanguageService
}
// WithResourceLogger sets the resource audit logger for the service.

View File

@ -14,13 +14,6 @@ var (
var _ influxdb.SessionService = (*Service)(nil)
func (s *Service) initializeSessions(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket([]byte(sessionBucket)); err != nil {
return err
}
return nil
}
// RenewSession extends the expire time to newExpiration.
func (s *Service) RenewSession(ctx context.Context, session *influxdb.Session, newExpiration time.Time) error {
if session == nil {

View File

@ -27,16 +27,12 @@ func initBoltSessionService(f influxdbtesting.SessionFields, t *testing.T) (infl
}
}
func initSessionService(s kv.Store, f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) {
func initSessionService(s kv.SchemaStore, f influxdbtesting.SessionFields, t *testing.T) (influxdb.SessionService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TokenGenerator = f.TokenGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing session service: %v", err)
}
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")

View File

@ -36,25 +36,6 @@ func init() {
}
}
func (s *Service) initializeSources(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(sourceBucket); err != nil {
return err
}
_, pe := s.findSourceByID(ctx, tx, DefaultSource.ID)
if pe != nil && influxdb.ErrorCode(pe) != influxdb.ENotFound {
return pe
}
if influxdb.ErrorCode(pe) == influxdb.ENotFound {
if err := s.putSource(ctx, tx, &DefaultSource); err != nil {
return err
}
}
return nil
}
// DefaultSource retrieves the default source.
func (s *Service) DefaultSource(ctx context.Context) (*influxdb.Source, error) {
var sr *influxdb.Source

View File

@ -30,14 +30,11 @@ func initBoltSourceService(f influxdbtesting.SourceFields, t *testing.T) (influx
}
}
func initSourceService(s kv.Store, f influxdbtesting.SourceFields, t *testing.T) (influxdb.SourceService, string, func()) {
func initSourceService(s kv.SchemaStore, f influxdbtesting.SourceFields, t *testing.T) (influxdb.SourceService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing source service: %v", err)
}
for _, b := range f.Sources {
if err := svc.PutSource(ctx, b); err != nil {
t.Fatalf("failed to populate sources")

View File

@ -9,6 +9,8 @@ import (
var (
// ErrKeyNotFound is the error returned when the key requested is not found.
ErrKeyNotFound = errors.New("key not found")
// ErrBucketNotFound is the error returned when the bucket cannot be found.
ErrBucketNotFound = errors.New("bucket not found")
// ErrTxNotWritable is the error returned when an mutable operation is called during
// a non-writable transaction.
ErrTxNotWritable = errors.New("transaction is not writable")
@ -22,6 +24,23 @@ func IsNotFound(err error) bool {
return err == ErrKeyNotFound
}
// SchemaStore is a superset of Store along with store schema change
// functionality like bucket creation and deletion.
//
// This type is made available via the `kv/migration` package.
// It should be consumed via this package to create and delete buckets using a migration.
// Checkout the internal tool `cmd/internal/kvmigrate` for building a new migration Go file into
// the correct location (in kv/migration/all.go).
// Configuring your bucket here will ensure it is created properly on initialization of InfluxDB.
type SchemaStore interface {
Store
// CreateBucket creates a bucket on the underlying store if it does not exist
CreateBucket(ctx context.Context, bucket []byte) error
// DeleteBucket deletes a bucket on the underlying store if it exists
DeleteBucket(ctx context.Context, bucket []byte) error
}
// Store is an interface for a generic key value store. It is modeled after
// the boltdb database struct.
type Store interface {

View File

@ -129,21 +129,6 @@ func (s *StoreBase) EntKey(ctx context.Context, ent Entity) ([]byte, error) {
return s.encodeEnt(ctx, ent, s.EncodeEntKeyFn)
}
// Init creates the buckets.
func (s *StoreBase) Init(ctx context.Context, tx Tx) error {
span, ctx := s.startSpan(ctx)
defer span.Finish()
if _, err := s.bucket(ctx, tx); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: fmt.Sprintf("failed to create bucket: %s", string(s.BktName)),
Err: err,
}
}
return nil
}
type (
// DeleteOpts provides indicators to the store.Delete call for deleting a given
// entity. The FilterFn indicates the current value should be deleted when returning

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -20,14 +21,16 @@ func TestStoreBase(t *testing.T) {
inmemSVC, done, err := NewTestBoltStore(t)
require.NoError(t, err)
store := kv.NewStoreBase("foo", []byte("foo_"+bktSuffix), encKeyFn, encBodyFn, decFn, decToEntFn)
bucket := []byte("foo_" + bktSuffix)
store := kv.NewStoreBase("foo", bucket, encKeyFn, encBodyFn, decFn, decToEntFn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
require.NoError(t, inmemSVC.Update(ctx, func(tx kv.Tx) error {
return store.Init(ctx, tx)
}))
migrationName := fmt.Sprintf("create bucket %q", string(bucket))
migration.CreateBuckets(migrationName, bucket).Up(ctx, inmemSVC)
require.NoError(t, err)
return store, done, inmemSVC
}

View File

@ -21,23 +21,6 @@ type IndexStore struct {
IndexStore *StoreBase
}
// Init creates the entity and index buckets.
func (s *IndexStore) Init(ctx context.Context, tx Tx) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
initFns := []func(context.Context, Tx) error{
s.EntStore.Init,
s.IndexStore.Init,
}
for _, fn := range initFns {
if err := fn(ctx, tx); err != nil {
return err
}
}
return nil
}
// Delete deletes entities and associated indexes.
func (s *IndexStore) Delete(ctx context.Context, tx Tx, opts DeleteOpts) error {
span, ctx := tracing.StartSpanFromContext(ctx)

View File

@ -6,6 +6,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -23,10 +24,18 @@ func TestIndexStore(t *testing.T) {
const resource = "foo"
bucketName := []byte("foo_ent_" + bktSuffix)
indexBucketName := []byte("foo_idx+" + bktSuffix)
ctx := context.Background()
if err := migration.CreateBuckets("add foo buckets", bucketName, indexBucketName).Up(ctx, kvStoreStore); err != nil {
t.Fatal(err)
}
indexStore := &kv.IndexStore{
Resource: resource,
EntStore: newStoreBase(resource, []byte("foo_ent_"+bktSuffix), kv.EncIDKey, kv.EncBodyJSON, decJSONFooFn, decFooEntFn),
IndexStore: kv.NewOrgNameKeyStore(resource, []byte("foo_idx_"+bktSuffix), false),
EntStore: newStoreBase(resource, bucketName, kv.EncIDKey, kv.EncBodyJSON, decJSONFooFn, decFooEntFn),
IndexStore: kv.NewOrgNameKeyStore(resource, indexBucketName, false),
}
return indexStore, done, kvStoreStore

View File

@ -81,19 +81,6 @@ func kvToInfluxTask(k *kvTask) *influxdb.Task {
}
}
func (s *Service) initializeTasks(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(taskBucket); err != nil {
return err
}
if _, err := tx.Bucket(taskRunBucket); err != nil {
return err
}
if _, err := tx.Bucket(taskIndexBucket); err != nil {
return err
}
return nil
}
// FindTaskByID returns a single task
func (s *Service) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
var t *influxdb.Task
@ -1779,148 +1766,6 @@ func taskRunKey(taskID, runID influxdb.ID) ([]byte, error) {
return []byte(string(encodedID) + "/" + string(encodedRunID)), nil
}
func (s *Service) TaskOwnerIDUpMigration(ctx context.Context, store Store) error {
var ownerlessTasks []*influxdb.Task
// loop through the tasks and collect a set of tasks that are missing the owner id.
err := store.View(ctx, func(tx Tx) error {
taskBucket, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
c, err := taskBucket.ForwardCursor([]byte{})
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
for k, v := c.Next(); k != nil; k, v = c.Next() {
kvTask := &kvTask{}
if err := json.Unmarshal(v, kvTask); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
t := kvToInfluxTask(kvTask)
if !t.OwnerID.Valid() {
ownerlessTasks = append(ownerlessTasks, t)
}
}
if err := c.Err(); err != nil {
return err
}
return c.Close()
})
if err != nil {
return err
}
// loop through tasks
for _, t := range ownerlessTasks {
// open transaction
err := store.Update(ctx, func(tx Tx) error {
taskKey, err := taskKey(t.ID)
if err != nil {
return err
}
b, err := tx.Bucket(taskBucket)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
if !t.OwnerID.Valid() {
v, err := b.Get(taskKey)
if IsNotFound(err) {
return influxdb.ErrTaskNotFound
}
authType := struct {
AuthorizationID influxdb.ID `json:"authorizationID"`
}{}
if err := json.Unmarshal(v, &authType); err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
// try populating the owner from auth
encodedID, err := authType.AuthorizationID.Encode()
if err == nil {
authBucket, err := tx.Bucket([]byte("authorizationsv1"))
if err != nil {
return err
}
a, err := authBucket.Get(encodedID)
if err == nil {
auth := &influxdb.Authorization{}
if err := json.Unmarshal(a, auth); err != nil {
return err
}
t.OwnerID = auth.GetUserID()
}
}
}
// try populating owner from urm
if !t.OwnerID.Valid() {
b, err := tx.Bucket([]byte("userresourcemappingsv1"))
if err != nil {
return err
}
id, err := t.OrganizationID.Encode()
if err != nil {
return err
}
cur, err := b.ForwardCursor(id, WithCursorPrefix(id))
if err != nil {
return err
}
for k, v := cur.Next(); k != nil; k, v = cur.Next() {
m := &influxdb.UserResourceMapping{}
if err := json.Unmarshal(v, m); err != nil {
return err
}
if m.ResourceID == t.OrganizationID && m.ResourceType == influxdb.OrgsResourceType && m.UserType == influxdb.Owner {
t.OwnerID = m.UserID
break
}
}
if err := cur.Close(); err != nil {
return err
}
}
// if population fails return error
if !t.OwnerID.Valid() {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "could not populate owner ID for task",
}
}
// save task
taskBytes, err := json.Marshal(t)
if err != nil {
return influxdb.ErrInternalTaskServiceError(err)
}
err = b.Put(taskKey, taskBytes)
if err != nil {
return influxdb.ErrUnexpectedTaskBucketErr(err)
}
return nil
})
if err != nil {
return err
}
}
return nil
}
var taskOptionsPattern = regexp.MustCompile(`option\s+task\s*=\s*{.*}`)
// ExtractTaskOptions is a feature-flag driven switch between normal options

View File

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"testing"
"time"
@ -33,13 +32,10 @@ func TestBoltTaskService(t *testing.T) {
t.Fatal(err)
}
ctx, cancelFunc := context.WithCancel(context.Background())
service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
ctx, cancelFunc := context.WithCancel(context.Background())
if err := service.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
}
go func() {
<-ctx.Done()
@ -79,21 +75,23 @@ func newService(t *testing.T, ctx context.Context, c clock.Clock) *testService {
c = clock.New()
}
ts := &testService{}
var err error
ts.Store, ts.storeCloseFn, err = NewTestInmemStore(t)
var (
ts = &testService{}
err error
store kv.SchemaStore
)
store, ts.storeCloseFn, err = NewTestInmemStore(t)
if err != nil {
t.Fatal("failed to create InmemStore", err)
}
ts.Service = kv.NewService(zaptest.NewLogger(t), ts.Store, kv.ServiceConfig{
ts.Store = store
ts.Service = kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{
Clock: c,
FluxLanguageService: fluxlang.DefaultService,
})
err = ts.Service.Initialize(ctx)
if err != nil {
t.Fatal("Service.Initialize", err)
}
ts.User = influxdb.User{Name: t.Name() + "-user"}
if err := ts.Service.CreateUser(ctx, &ts.User); err != nil {
@ -257,14 +255,13 @@ func TestTaskRunCancellation(t *testing.T) {
}
defer close()
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
service := kv.NewService(zaptest.NewLogger(t), store, kv.ServiceConfig{
FluxLanguageService: fluxlang.DefaultService,
})
ctx, cancelFunc := context.WithCancel(context.Background())
if err := service.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
}
defer cancelFunc()
u := &influxdb.User{Name: t.Name() + "-user"}
if err := service.CreateUser(ctx, u); err != nil {
t.Fatal(err)
@ -322,77 +319,6 @@ func TestTaskRunCancellation(t *testing.T) {
}
}
func TestTaskMigrate(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ts := newService(t, ctx, nil)
defer ts.Close()
id := "05da585043e02000"
// create a task that has auth set and no ownerID
err := ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"05d3ae3492c9c000","org":"whos","authorizationID":"%s","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Auth.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
idType, _ := influxdb.IDFromString(id)
task, err := ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
// create a task that has no auth or owner id but a urm exists
err = ts.Store.Update(context.Background(), func(tx kv.Tx) error {
b, err := tx.Bucket([]byte("tasksv1"))
if err != nil {
t.Fatal(err)
}
taskBody := fmt.Sprintf(`{"id":"05da585043e02000","type":"system","orgID":"%s","org":"whos","name":"asdf","status":"active","flux":"option v = {\n bucket: \"bucks\",\n timeRangeStart: -1h,\n timeRangeStop: now()\n}\n\noption task = { \n name: \"asdf\",\n every: 5m,\n}\n\nfrom(bucket: \"_monitoring\")\n |\u003e range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"boltdb_reads_total\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"counter\")\n |\u003e to(bucket: \"bucks\", org: \"whos\")","every":"5m","latestCompleted":"2020-06-16T17:01:26.083319Z","latestScheduled":"2020-06-16T17:01:26.083319Z","lastRunStatus":"success","createdAt":"2020-06-15T19:10:29Z","updatedAt":"0001-01-01T00:00:00Z"}`, ts.Org.ID.String())
err = b.Put([]byte(id), []byte(taskBody))
if err != nil {
t.Fatal(err)
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = ts.Service.TaskOwnerIDUpMigration(context.Background(), ts.Store)
if err != nil {
t.Fatal(err)
}
task, err = ts.Service.FindTaskByID(context.Background(), *idType)
if err != nil {
t.Fatal(err)
}
if task.OwnerID != ts.User.ID {
t.Fatal("failed to fill in ownerID")
}
}
type taskOptions struct {
name string
every string

View File

@ -74,16 +74,6 @@ var (
var _ influxdb.TelegrafConfigStore = (*Service)(nil)
func (s *Service) initializeTelegraf(ctx context.Context, tx Tx) error {
if _, err := s.telegrafBucket(tx); err != nil {
return err
}
if _, err := s.telegrafPluginsBucket(tx); err != nil {
return err
}
return nil
}
func (s *Service) telegrafBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(telegrafBucket)
if err != nil {

View File

@ -27,15 +27,11 @@ func initBoltTelegrafService(f influxdbtesting.TelegrafConfigFields, t *testing.
}
}
func initTelegrafService(s kv.Store, f influxdbtesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) {
func initTelegrafService(s kv.SchemaStore, f influxdbtesting.TelegrafConfigFields, t *testing.T) (influxdb.TelegrafConfigStore, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, tc := range f.TelegrafConfigs {
if err := svc.PutTelegrafConfig(ctx, tc); err != nil {
t.Fatalf("failed to populate telegraf config: %v", err)

View File

@ -24,6 +24,7 @@ func initBoltTenantService(t *testing.T, f itesting.TenantFields) (influxdb.Tena
t.Fatalf("failed to create new bolt kv store: %v", err)
}
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
// Create a mapping from user-specified IDs to kv generated ones.
@ -32,10 +33,6 @@ func initBoltTenantService(t *testing.T, f itesting.TenantFields) (influxdb.Tena
oIDs := make(map[influxdb.ID]influxdb.ID, len(f.Organizations))
bIDs := make(map[influxdb.ID]influxdb.ID, len(f.Buckets))
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing authorization service: %v", err)
}
for _, u := range f.Users {
id := u.ID
if err := svc.CreateUser(ctx, u); err != nil {

View File

@ -12,8 +12,7 @@ import (
)
var (
urmBucket = []byte("userresourcemappingsv1")
urmByUserIndexBucket = []byte("userresourcemappingsbyuserindexv1")
urmBucket = []byte("userresourcemappingsv1")
// ErrInvalidURMID is used when the service was provided
// an invalid ID format.
@ -27,6 +26,22 @@ var (
Msg: "user to resource mapping not found",
Code: influxdb.ENotFound,
}
// URMByUserIndeMappingx is the mapping description of an index
// between a user and a URM
URMByUserIndexMapping = NewIndexMapping(
urmBucket,
[]byte("userresourcemappingsbyuserindexv1"),
func(v []byte) ([]byte, error) {
var urm influxdb.UserResourceMapping
if err := json.Unmarshal(v, &urm); err != nil {
return nil, err
}
id, _ := urm.UserID.Encode()
return id, nil
},
)
)
// UnavailableURMServiceError is used if we aren't able to interact with the
@ -66,13 +81,6 @@ func NonUniqueMappingError(userID influxdb.ID) error {
}
}
func (s *Service) initializeURMs(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(urmBucket); err != nil {
return UnavailableURMServiceError(err)
}
return nil
}
func filterMappingsFn(filter influxdb.UserResourceMappingFilter) func(m *influxdb.UserResourceMapping) bool {
return func(mapping *influxdb.UserResourceMapping) bool {
return (!filter.UserID.Valid() || (filter.UserID == mapping.UserID)) &&

View File

@ -13,6 +13,7 @@ import (
)
type testable interface {
Helper()
Logf(string, ...interface{})
Error(args ...interface{})
Errorf(string, ...interface{})
@ -34,7 +35,7 @@ func TestInmemUserResourceMappingService(t *testing.T) {
type userResourceMappingTestFunc func(influxdbtesting.UserResourceFields, *testing.T) (influxdb.UserResourceMappingService, func())
func initURMServiceFunc(storeFn func(*testing.T) (kv.Store, func(), error), confs ...kv.ServiceConfig) userResourceMappingTestFunc {
func initURMServiceFunc(storeFn func(*testing.T) (kv.SchemaStore, func(), error), confs ...kv.ServiceConfig) userResourceMappingTestFunc {
return func(f influxdbtesting.UserResourceFields, t *testing.T) (influxdb.UserResourceMappingService, func()) {
s, closeStore, err := storeFn(t)
if err != nil {
@ -49,13 +50,9 @@ func initURMServiceFunc(storeFn func(*testing.T) (kv.Store, func(), error), conf
}
}
func initUserResourceMappingService(s kv.Store, f influxdbtesting.UserResourceFields, t testable, configs ...kv.ServiceConfig) (influxdb.UserResourceMappingService, func()) {
svc := kv.NewService(zaptest.NewLogger(t), s, configs...)
func initUserResourceMappingService(s kv.SchemaStore, f influxdbtesting.UserResourceFields, t testable, configs ...kv.ServiceConfig) (influxdb.UserResourceMappingService, func()) {
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing urm service: %v", err)
}
svc := kv.NewService(zaptest.NewLogger(t), s, configs...)
for _, o := range f.Organizations {
if err := svc.CreateOrganization(ctx, o); err != nil {

View File

@ -18,18 +18,6 @@ var (
var _ influxdb.UserService = (*Service)(nil)
var _ influxdb.UserOperationLogService = (*Service)(nil)
// Initialize creates the buckets for the user service.
func (s *Service) initializeUsers(ctx context.Context, tx Tx) error {
if _, err := s.userBucket(tx); err != nil {
return err
}
if _, err := s.userIndexBucket(tx); err != nil {
return err
}
return nil
}
func (s *Service) userBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket([]byte(userBucket))
if err != nil {

View File

@ -27,15 +27,11 @@ func initBoltUserService(f influxdbtesting.UserFields, t *testing.T) (influxdb.U
}
}
func initUserService(s kv.Store, f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
func initUserService(s kv.SchemaStore, f influxdbtesting.UserFields, t *testing.T) (influxdb.UserService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing user service: %v", err)
}
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")

View File

@ -8,15 +8,12 @@ import (
"github.com/influxdata/influxdb/v2"
)
// TODO: eradicate this with migration strategy
var variableOrgsIndex = []byte("variableorgsv1")
func (s *Service) initializeVariablesOrgIndex(tx Tx) error {
if _, err := tx.Bucket(variableOrgsIndex); err != nil {
return err
}
return nil
}
var (
variableBucket = []byte("variablesv1")
variableIndexBucket = []byte("variablesindexv1")
// TODO: eradicate this with migration strategy
variableOrgsIndex = []byte("variableorgsv1")
)
func decodeVariableOrgsIndexKey(indexKey []byte) (orgID influxdb.ID, variableID influxdb.ID, err error) {
if len(indexKey) != 2*influxdb.IDLength {
@ -101,8 +98,8 @@ func newVariableStore() *IndexStore {
return &IndexStore{
Resource: resource,
EntStore: NewStoreBase(resource, []byte("variablesv1"), EncIDKey, EncBodyJSON, decodeVarEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, []byte("variablesindexv1"), false),
EntStore: NewStoreBase(resource, variableBucket, EncIDKey, EncBodyJSON, decodeVarEntFn, decValToEntFn),
IndexStore: NewOrgNameKeyStore(resource, variableIndexBucket, false),
}
}

View File

@ -27,19 +27,15 @@ func initBoltVariableService(f influxdbtesting.VariableFields, t *testing.T) (in
}
}
func initVariableService(s kv.Store, f influxdbtesting.VariableFields, t *testing.T) (influxdb.VariableService, string, func()) {
func initVariableService(s kv.SchemaStore, f influxdbtesting.VariableFields, t *testing.T) (influxdb.VariableService, string, func()) {
ctx := context.Background()
svc := kv.NewService(zaptest.NewLogger(t), s)
svc.IDGenerator = f.IDGenerator
svc.TimeGenerator = f.TimeGenerator
if svc.TimeGenerator == nil {
svc.TimeGenerator = influxdb.RealTimeGenerator{}
}
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing variable service: %v", err)
}
for _, variable := range f.Variables {
if err := svc.ReplaceVariable(ctx, variable); err != nil {
t.Fatalf("failed to populate test variables: %v", err)

View File

@ -30,9 +30,6 @@ func initOldLabelService(s kv.Store, f influxdbtesting.LabelFields, t *testing.T
svc.IDGenerator = f.IDGenerator
ctx := context.Background()
if err := svc.Initialize(ctx); err != nil {
t.Fatalf("error initializing label service: %v", err)
}
for _, l := range f.Labels {
if err := svc.PutLabel(ctx, l); err != nil {
t.Fatalf("failed to populate labels: %v", err)

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/label"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"go.uber.org/zap/zaptest"
@ -20,6 +21,8 @@ func TestBoltLabelService(t *testing.T) {
}
func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
t.Helper()
f, err := ioutil.TempFile("", "influxdata-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
@ -27,8 +30,14 @@ func NewTestBoltStore(t *testing.T) (kv.Store, func(), error) {
f.Close()
path := f.Name()
s := bolt.NewKVStore(zaptest.NewLogger(t), path)
if err := s.Open(context.Background()); err != nil {
ctx := context.Background()
logger := zaptest.NewLogger(t)
s := bolt.NewKVStore(logger, path)
if err := s.Open(ctx); err != nil {
return nil, nil, err
}
if err := all.Up(ctx, logger, s); err != nil {
return nil, nil, err
}

View File

@ -9,14 +9,12 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/label"
"go.uber.org/zap/zaptest"
)
func TestLabels(t *testing.T) {
s := func() kv.Store {
return inmem.NewKVStore()
}
setup := func(t *testing.T, store *label.Store, tx kv.Tx) {
for i := 1; i <= 10; i++ {
err := store.CreateLabel(context.Background(), tx, &influxdb.Label{
@ -227,7 +225,14 @@ func TestLabels(t *testing.T) {
for _, testScenario := range tt {
t.Run(testScenario.name, func(t *testing.T) {
ts, err := label.NewStore(s())
t.Parallel()
store := inmem.NewKVStore()
if err := all.Up(context.Background(), zaptest.NewLogger(t), store); err != nil {
t.Fatal(err)
}
ts, err := label.NewStore(store)
if err != nil {
t.Fatal(err)
}

Some files were not shown because too many files have changed in this diff Show More