feat(kv): support for migrations (#17145)
* feat(kv): migration types for managing kv buckets and indexes over time chore(kv): fixup comments in migrator types fix(kv): initialize migrator bucket on kv service initialize chore(kv): remove currently unused auth index chore(kv): remove currently unused urm index * chore(kv): move migrator tests into testing package and run for inmem and bolt * chore(changelog): update changelog to reflect kv migrator type * fix(kv): update auto migration store to return migratable store * chore(kv): wrap error using func instead of defer in index * chore(kv): rename Name method to MigrationName for clarity * chore(kv): update migration log messages to match influxdb standardpull/17336/head
parent
82d88d52c4
commit
1d400a4f0f
|
@ -24,6 +24,7 @@
|
|||
1. [17138](https://github.com/influxdata/influxdb/pull/17138): Extend pkger export all capabilities to support filtering by lable name and resource type
|
||||
1. [17049](https://github.com/influxdata/influxdb/pull/17049): Added new login and sign-up screen that for cloud users that allows direct login from their region
|
||||
1. [17170](https://github.com/influxdata/influxdb/pull/17170): Added new cli multiple profiles management tool
|
||||
1. [17145](https://github.com/influxdata/influxdb/pull/17145): Update kv.Store to define schema changes via new kv.Migrator types
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
|
|
@ -18,6 +18,9 @@ import (
|
|||
// check that *KVStore implement kv.Store interface.
|
||||
var _ kv.Store = (*KVStore)(nil)
|
||||
|
||||
// ensure *KVStore implements kv.AutoMigrationStore.
|
||||
var _ kv.AutoMigrationStore = (*KVStore)(nil)
|
||||
|
||||
// KVStore is a kv.Store backed by boltdb.
|
||||
type KVStore struct {
|
||||
path string
|
||||
|
@ -34,6 +37,11 @@ func NewKVStore(log *zap.Logger, path string) *KVStore {
|
|||
}
|
||||
}
|
||||
|
||||
// AutoMigrate returns itself as it is safe to automatically apply migrations on initialization.
|
||||
func (s *KVStore) AutoMigrate() kv.Store {
|
||||
return s
|
||||
}
|
||||
|
||||
// Open creates boltDB file it doesn't exists and opens it otherwise.
|
||||
func (s *KVStore) Open(ctx context.Context) error {
|
||||
span, _ := tracing.StartSpanFromContext(ctx)
|
||||
|
|
|
@ -14,6 +14,9 @@ import (
|
|||
// ensure *KVStore implement kv.Store interface
|
||||
var _ kv.Store = (*KVStore)(nil)
|
||||
|
||||
// ensure *KVStore implements kv.AutoMigrationStore
|
||||
var _ kv.AutoMigrationStore = (*KVStore)(nil)
|
||||
|
||||
// cursorBatchSize is the size of a batch sent by a forward cursors
|
||||
// tree iterator
|
||||
const cursorBatchSize = 1000
|
||||
|
@ -61,6 +64,11 @@ func (s *KVStore) Backup(ctx context.Context, w io.Writer) error {
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
// AutoMigrate returns itlsef as *KVStore is safe to migrate automically on initialize.
|
||||
func (s *KVStore) AutoMigrate() kv.Store {
|
||||
return s
|
||||
}
|
||||
|
||||
// Flush removes all data from the buckets. Used for testing.
|
||||
func (s *KVStore) Flush(ctx context.Context) {
|
||||
s.mu.Lock()
|
||||
|
|
53
kv/index.go
53
kv/index.go
|
@ -141,8 +141,7 @@ func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index {
|
|||
return index
|
||||
}
|
||||
|
||||
// Initialize creates the bucket if it does not already exist.
|
||||
func (i *Index) Initialize(ctx context.Context, store Store) error {
|
||||
func (i *Index) initialize(ctx context.Context, store Store) error {
|
||||
return store.Update(ctx, func(tx Tx) error {
|
||||
// create bucket if not exist
|
||||
_, err := tx.Bucket(i.IndexBucket())
|
||||
|
@ -180,6 +179,56 @@ func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// ensure IndexMigration implements MigrationSpec
|
||||
var _ MigrationSpec = (*IndexMigration)(nil)
|
||||
|
||||
// IndexMigration is a migration for adding and removing an index.
|
||||
// These are constructed via the Index.Migration function.
|
||||
type IndexMigration struct {
|
||||
*Index
|
||||
opts []PopulateOption
|
||||
}
|
||||
|
||||
// Name returns a readable name for the index migration.
|
||||
func (i *IndexMigration) MigrationName() string {
|
||||
return fmt.Sprintf("add index %q", string(i.IndexBucket()))
|
||||
}
|
||||
|
||||
// Up initializes the index bucket and populates the index.
|
||||
func (i *IndexMigration) Up(ctx context.Context, store Store) (err error) {
|
||||
wrapErr := func(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("migration (up) %s: %w", i.MigrationName(), err)
|
||||
}
|
||||
|
||||
if err = i.initialize(ctx, store); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
_, err = i.Populate(ctx, store, i.opts...)
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
// Down deletes all entries from the index.
|
||||
func (i *IndexMigration) Down(ctx context.Context, store Store) error {
|
||||
if err := i.DeleteAll(ctx, store); err != nil {
|
||||
return fmt.Errorf("migration (down) %s: %w", i.MigrationName(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Migration creates an IndexMigration for the underlying Index.
|
||||
func (i *Index) Migration(opts ...PopulateOption) *IndexMigration {
|
||||
return &IndexMigration{
|
||||
Index: i,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
// Insert creates a single index entry for the provided primary key on the foreign key.
|
||||
func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
|
||||
bkt, err := i.indexBucket(tx)
|
||||
|
|
|
@ -0,0 +1,337 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
influxdb "github.com/influxdata/influxdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
migrationBucket = []byte("migrationsv1")
|
||||
|
||||
// ErrMigrationSpecNotFound is returned when a migration specification is missing
|
||||
// for an already applied migration.
|
||||
ErrMigrationSpecNotFound = errors.New("migration specification not found")
|
||||
)
|
||||
|
||||
// MigrationState is a type for describing the state of a migration.
|
||||
type MigrationState uint
|
||||
|
||||
const (
|
||||
// DownMigrationState is for a migration not yet applied.
|
||||
DownMigrationState MigrationState = iota
|
||||
// UpMigration State is for a migration which has been applied.
|
||||
UpMigrationState
|
||||
)
|
||||
|
||||
// String returns a string representation for a migration state.
|
||||
func (s MigrationState) String() string {
|
||||
switch s {
|
||||
case DownMigrationState:
|
||||
return "down"
|
||||
case UpMigrationState:
|
||||
return "up"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// Migration is a record of a particular migration.
|
||||
type Migration struct {
|
||||
ID influxdb.ID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
State MigrationState `json:"-"`
|
||||
StartedAt *time.Time `json:"started_at"`
|
||||
FinishedAt *time.Time `json:"finished_at,omitempty"`
|
||||
}
|
||||
|
||||
// MigrationSpec is a specification for a particular migration.
|
||||
// It describes the name of the migration and up and down operations
|
||||
// needed to fulfill the migration.
|
||||
type MigrationSpec interface {
|
||||
MigrationName() string
|
||||
Up(ctx context.Context, store Store) error
|
||||
Down(ctx context.Context, store Store) error
|
||||
}
|
||||
|
||||
// MigrationFunc is a function which can be used as either an up or down operation.
|
||||
type MigrationFunc func(context.Context, Store) error
|
||||
|
||||
// AnonymousMigration is a utility type for creating migrations from anonyomous functions.
|
||||
type AnonymousMigration struct {
|
||||
name string
|
||||
up MigrationFunc
|
||||
down MigrationFunc
|
||||
}
|
||||
|
||||
// NewAnonymousMigration constructs a new migration from a name string and an up and a down function.
|
||||
func NewAnonymousMigration(name string, up, down MigrationFunc) AnonymousMigration {
|
||||
return AnonymousMigration{name, up, down}
|
||||
}
|
||||
|
||||
// Name returns the name of the migration.
|
||||
func (a AnonymousMigration) MigrationName() string { return a.name }
|
||||
|
||||
// Up calls the underlying up migration func.
|
||||
func (a AnonymousMigration) Up(ctx context.Context, store Store) error { return a.up(ctx, store) }
|
||||
|
||||
// Down calls the underlying down migration func.
|
||||
func (a AnonymousMigration) Down(ctx context.Context, store Store) error { return a.down(ctx, store) }
|
||||
|
||||
// Migrator is a type which manages migrations.
|
||||
// It takes a list of migration specifications and undo (down) all or apply (up) outstanding migrations.
|
||||
// It records the state of the world in store under the migrations bucket.
|
||||
type Migrator struct {
|
||||
logger *zap.Logger
|
||||
MigrationSpecs []MigrationSpec
|
||||
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
// NewMigrator constructs and configures a new Migrator.
|
||||
func NewMigrator(logger *zap.Logger, ms ...MigrationSpec) *Migrator {
|
||||
m := &Migrator{
|
||||
logger: logger,
|
||||
now: func() time.Time {
|
||||
return time.Now().UTC()
|
||||
},
|
||||
}
|
||||
m.AddMigrations(ms...)
|
||||
return m
|
||||
}
|
||||
|
||||
// AddMigrations appends the provided migration specs onto the Migrator.
|
||||
func (m *Migrator) AddMigrations(ms ...MigrationSpec) {
|
||||
m.MigrationSpecs = append(m.MigrationSpecs, ms...)
|
||||
}
|
||||
|
||||
// Initialize creates the migration bucket if it does not yet exist.
|
||||
func (m *Migrator) Initialize(ctx context.Context, store Store) error {
|
||||
return store.Update(ctx, func(tx Tx) error {
|
||||
_, err := tx.Bucket(migrationBucket)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// List returns a list of migrations and their states within the provided store.
|
||||
func (m *Migrator) List(ctx context.Context, store Store) (migrations []Migration, _ error) {
|
||||
if err := m.walk(ctx, store, func(id influxdb.ID, m Migration) {
|
||||
migrations = append(migrations, m)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
migrationsLen := len(migrations)
|
||||
for idx, spec := range m.MigrationSpecs[migrationsLen:] {
|
||||
migration := Migration{
|
||||
ID: influxdb.ID(migrationsLen + idx + 1),
|
||||
Name: spec.MigrationName(),
|
||||
}
|
||||
|
||||
migrations = append(migrations, migration)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Up applies each outstanding migration in order.
|
||||
// Migrations are applied in order from the lowest indexed migration in a down state.
|
||||
//
|
||||
// For example, given:
|
||||
// 0001 add bucket foo | (up)
|
||||
// 0002 add bucket bar | (down)
|
||||
// 0003 add index "foo on baz" | (down)
|
||||
//
|
||||
// Up would apply migration 0002 and then 0003.
|
||||
func (m *Migrator) Up(ctx context.Context, store Store) error {
|
||||
wrapErr := func(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("up: %w", err)
|
||||
}
|
||||
|
||||
var lastMigration int
|
||||
if err := m.walk(ctx, store, func(id influxdb.ID, mig Migration) {
|
||||
// we're interested in the last up migration
|
||||
if mig.State == UpMigrationState {
|
||||
lastMigration = int(id)
|
||||
}
|
||||
}); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
for idx, spec := range m.MigrationSpecs[lastMigration:] {
|
||||
startedAt := m.now()
|
||||
migration := Migration{
|
||||
ID: influxdb.ID(lastMigration + idx + 1),
|
||||
Name: spec.MigrationName(),
|
||||
StartedAt: &startedAt,
|
||||
}
|
||||
|
||||
m.logMigrationEvent(UpMigrationState, migration, "started")
|
||||
|
||||
if err := m.putMigration(ctx, store, migration); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
if err := spec.Up(ctx, store); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
finishedAt := m.now()
|
||||
migration.FinishedAt = &finishedAt
|
||||
migration.State = UpMigrationState
|
||||
|
||||
if err := m.putMigration(ctx, store, migration); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
m.logMigrationEvent(UpMigrationState, migration, "completed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Down applies the down operation of each currently applied migration.
|
||||
// Migrations are applied in reverse order from the highest indexed migration in a down state.
|
||||
//
|
||||
// For example, given:
|
||||
// 0001 add bucket foo | (up)
|
||||
// 0002 add bucket bar | (up)
|
||||
// 0003 add index "foo on baz" | (down)
|
||||
//
|
||||
// Down would call down() on 0002 and then on 0001.
|
||||
func (m *Migrator) Down(ctx context.Context, store Store) (err error) {
|
||||
wrapErr := func(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("down: %w", err)
|
||||
}
|
||||
|
||||
var migrations []struct {
|
||||
MigrationSpec
|
||||
Migration
|
||||
}
|
||||
|
||||
if err := m.walk(ctx, store, func(id influxdb.ID, mig Migration) {
|
||||
migrations = append(
|
||||
migrations,
|
||||
struct {
|
||||
MigrationSpec
|
||||
Migration
|
||||
}{
|
||||
m.MigrationSpecs[int(id)-1],
|
||||
mig,
|
||||
},
|
||||
)
|
||||
}); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
for i := len(migrations) - 1; i >= 0; i-- {
|
||||
migration := migrations[i]
|
||||
|
||||
m.logMigrationEvent(DownMigrationState, migration.Migration, "started")
|
||||
|
||||
if err := migration.MigrationSpec.Down(ctx, store); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
if err := m.deleteMigration(ctx, store, migration.Migration); err != nil {
|
||||
return wrapErr(err)
|
||||
}
|
||||
|
||||
m.logMigrationEvent(DownMigrationState, migration.Migration, "completed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Migrator) logMigrationEvent(state MigrationState, mig Migration, event string) {
|
||||
m.logger.Info(fmt.Sprintf("Migration %q %s (%s)", mig.Name, event, state))
|
||||
}
|
||||
|
||||
func (m *Migrator) walk(ctx context.Context, store Store, fn func(id influxdb.ID, m Migration)) error {
|
||||
if err := store.View(ctx, func(tx Tx) error {
|
||||
bkt, err := tx.Bucket(migrationBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cursor, err := bkt.ForwardCursor(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return WalkCursor(ctx, cursor, func(k, v []byte) error {
|
||||
var id influxdb.ID
|
||||
if err := id.Decode(k); err != nil {
|
||||
return fmt.Errorf("decoding migration id: %w", err)
|
||||
}
|
||||
|
||||
var migration Migration
|
||||
if err := json.Unmarshal(v, &migration); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idx := int(id) - 1
|
||||
if idx >= len(m.MigrationSpecs) {
|
||||
return fmt.Errorf("migration %q: %w", migration.Name, ErrMigrationSpecNotFound)
|
||||
}
|
||||
|
||||
if spec := m.MigrationSpecs[idx]; spec.MigrationName() != migration.Name {
|
||||
return fmt.Errorf("expected migration %q, found %q", spec.MigrationName(), migration.Name)
|
||||
}
|
||||
|
||||
if migration.FinishedAt != nil {
|
||||
migration.State = UpMigrationState
|
||||
}
|
||||
|
||||
fn(id, migration)
|
||||
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return fmt.Errorf("reading migrations: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*Migrator) putMigration(ctx context.Context, store Store, m Migration) error {
|
||||
return store.Update(ctx, func(tx Tx) error {
|
||||
bkt, err := tx.Bucket(migrationBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, _ := m.ID.Encode()
|
||||
return bkt.Put(id, data)
|
||||
})
|
||||
}
|
||||
|
||||
func (*Migrator) deleteMigration(ctx context.Context, store Store, m Migration) error {
|
||||
return store.Update(ctx, func(tx Tx) error {
|
||||
bkt, err := tx.Bucket(migrationBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, _ := m.ID.Encode()
|
||||
return bkt.Delete(id)
|
||||
})
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// MigratorSetNow sets the now function on the migrator.
|
||||
// This function is only reachable via tests defined within this
|
||||
// package folder.
|
||||
func MigratorSetNow(migrator *Migrator, now func() time.Time) {
|
||||
migrator.now = now
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package kv_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/inmem"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
influxdbtesting "github.com/influxdata/influxdb/testing"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func newMigrator(logger *zap.Logger, now influxdbtesting.NowFunc) *kv.Migrator {
|
||||
migrator := kv.NewMigrator(logger)
|
||||
kv.MigratorSetNow(migrator, now)
|
||||
return migrator
|
||||
}
|
||||
|
||||
func Test_Inmem_Migrator(t *testing.T) {
|
||||
influxdbtesting.Migrator(t, inmem.NewKVStore(), newMigrator)
|
||||
}
|
||||
|
||||
func Test_Bolt_Migrator(t *testing.T) {
|
||||
store, closeBolt, err := NewTestBoltStore(t)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create new kv store: %v", err)
|
||||
}
|
||||
defer closeBolt()
|
||||
|
||||
influxdbtesting.Migrator(t, store, newMigrator)
|
||||
}
|
|
@ -43,6 +43,8 @@ type Service struct {
|
|||
checkStore *IndexStore
|
||||
endpointStore *IndexStore
|
||||
variableStore *IndexStore
|
||||
|
||||
Migrator *Migrator
|
||||
}
|
||||
|
||||
// NewService returns an instance of a Service.
|
||||
|
@ -60,8 +62,24 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service {
|
|||
checkStore: newCheckStore(),
|
||||
endpointStore: newEndpointStore(),
|
||||
variableStore: newVariableStore(),
|
||||
Migrator: NewMigrator(log),
|
||||
}
|
||||
|
||||
// kv service migrations
|
||||
s.Migrator.AddMigrations(
|
||||
// initial migration is the state of the world when
|
||||
// the migrator was introduced.
|
||||
NewAnonymousMigration(
|
||||
"initial migration",
|
||||
s.initializeAll,
|
||||
// down is a noop
|
||||
func(context.Context, Store) error {
|
||||
return nil
|
||||
},
|
||||
),
|
||||
// and new migrations below here (and move this comment down):
|
||||
)
|
||||
|
||||
if len(configs) > 0 {
|
||||
s.Config = configs[0]
|
||||
} else {
|
||||
|
@ -82,9 +100,38 @@ type ServiceConfig struct {
|
|||
Clock clock.Clock
|
||||
}
|
||||
|
||||
// AutoMigrationStore is a Store which also describes whether or not
|
||||
// migrations can be applied automatically.
|
||||
// Given the AutoMigrate method is defined and it returns a non-nil kv.Store
|
||||
// implementation, then it will automatically invoke migrator.Up(store)
|
||||
// on the returned kv.Store during Service.Initialize(...).
|
||||
type AutoMigrationStore interface {
|
||||
Store
|
||||
AutoMigrate() Store
|
||||
}
|
||||
|
||||
// Initialize creates Buckets needed.
|
||||
func (s *Service) Initialize(ctx context.Context) error {
|
||||
return s.kv.Update(ctx, func(tx Tx) error {
|
||||
if err := s.Migrator.Initialize(ctx, s.kv); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if store implements auto migrate and the resulting Store from
|
||||
// AutoMigrate() is non-nil, apply migrator.Up() to the resulting store.
|
||||
if store, ok := s.kv.(AutoMigrationStore); ok {
|
||||
if migrateStore := store.AutoMigrate(); migrateStore != nil {
|
||||
return s.Migrator.Up(ctx, migrateStore)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) initializeAll(ctx context.Context, store Store) error {
|
||||
// please do not initialize anymore buckets here
|
||||
// add them as a new migration to the list of migrations
|
||||
// defined in NewService.
|
||||
if err := store.Update(ctx, func(tx Tx) error {
|
||||
if err := s.initializeAuths(ctx, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -171,8 +218,11 @@ func (s *Service) Initialize(ctx context.Context) error {
|
|||
}
|
||||
|
||||
return s.initializeUsers(ctx, tx)
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithResourceLogger sets the resource audit logger for the service.
|
||||
|
|
|
@ -36,16 +36,11 @@ type someResourceStore struct {
|
|||
ownerIDIndex *kv.Index
|
||||
}
|
||||
|
||||
func newSomeResourceStore(ctx context.Context, store kv.Store) (*someResourceStore, error) {
|
||||
ownerIDIndex := kv.NewIndex(mapping)
|
||||
if err := ownerIDIndex.Initialize(ctx, store); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func newSomeResourceStore(ctx context.Context, store kv.Store) *someResourceStore {
|
||||
return &someResourceStore{
|
||||
store: store,
|
||||
ownerIDIndex: ownerIDIndex,
|
||||
}, nil
|
||||
ownerIDIndex: kv.NewIndex(mapping),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *someResourceStore) FindByOwner(ctx context.Context, ownerID string) (resources []someResource, err error) {
|
||||
|
@ -112,9 +107,9 @@ func TestIndex(t *testing.T, store kv.Store) {
|
|||
|
||||
func testPopulateAndVerify(t *testing.T, store kv.Store) {
|
||||
var (
|
||||
ctx = context.TODO()
|
||||
resources = newNResources(20)
|
||||
resourceStore, err = newSomeResourceStore(ctx, store)
|
||||
ctx = context.TODO()
|
||||
resources = newNResources(20)
|
||||
resourceStore = newSomeResourceStore(ctx, store)
|
||||
)
|
||||
|
||||
// insert 20 resources, but only index the first half
|
||||
|
@ -270,7 +265,7 @@ func testWalk(t *testing.T, store kv.Store) {
|
|||
ctx = context.TODO()
|
||||
resources = newNResources(20)
|
||||
// configure resource store with read disabled
|
||||
resourceStore, err = newSomeResourceStore(ctx, store)
|
||||
resourceStore = newSomeResourceStore(ctx, store)
|
||||
|
||||
cases = []struct {
|
||||
owner string
|
||||
|
@ -324,10 +319,6 @@ func testWalk(t *testing.T, store kv.Store) {
|
|||
}
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// insert all 20 resources with indexing enabled
|
||||
for _, resource := range resources {
|
||||
if err := resourceStore.Create(ctx, resource, true); err != nil {
|
||||
|
|
|
@ -0,0 +1,364 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
influxdb "github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/kv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// NowFunc is a function which returns a time
|
||||
type NowFunc func() time.Time
|
||||
|
||||
// Migrator tests a migrator against a provided store.
|
||||
// The migrator is constructed via a provided constructor function which takes
|
||||
// a logger and a now function used to derive time.
|
||||
func Migrator(t *testing.T, store kv.Store, newMigrator func(*zap.Logger, NowFunc) *kv.Migrator) {
|
||||
var (
|
||||
ctx = context.TODO()
|
||||
logger = zap.NewNop()
|
||||
migrationOne = newMigration("migration one")
|
||||
migrationTwo = newMigration("migration two")
|
||||
migrationThree = newMigration("migration three")
|
||||
migrationFour = newMigration("migration four")
|
||||
|
||||
// mocking now time
|
||||
timestamp = int64(0)
|
||||
now = func() time.Time {
|
||||
timestamp++
|
||||
return time.Unix(timestamp, 0).In(time.UTC)
|
||||
}
|
||||
|
||||
// ts returns a point to a time at N unix seconds.
|
||||
ts = func(n int64) *time.Time {
|
||||
t := time.Unix(n, 0).In(time.UTC)
|
||||
return &t
|
||||
}
|
||||
|
||||
migrator = newMigrator(logger, now)
|
||||
)
|
||||
|
||||
migrator.AddMigrations(
|
||||
// all migrations excluding number four (for now)
|
||||
migrationOne,
|
||||
migrationTwo,
|
||||
migrationThree,
|
||||
)
|
||||
|
||||
if err := migrator.Initialize(ctx, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Run("List() shows all migrations in down state", func(t *testing.T) {
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Up() runs each migration in turn", func(t *testing.T) {
|
||||
// apply all migrations
|
||||
if err := migrator.Up(ctx, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(1),
|
||||
FinishedAt: ts(2),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(3),
|
||||
FinishedAt: ts(4),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(5),
|
||||
FinishedAt: ts(6),
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
|
||||
// assert each migration was called
|
||||
migrationOne.assertUpCalled(t, 1)
|
||||
migrationTwo.assertUpCalled(t, 1)
|
||||
migrationThree.assertUpCalled(t, 1)
|
||||
})
|
||||
|
||||
t.Run("List() after adding new migration it reports as expected", func(t *testing.T) {
|
||||
migrator.AddMigrations(migrationFour)
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(1),
|
||||
FinishedAt: ts(2),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(3),
|
||||
FinishedAt: ts(4),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(5),
|
||||
FinishedAt: ts(6),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(4),
|
||||
Name: "migration four",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Up() only applies the single down migration", func(t *testing.T) {
|
||||
// apply all migrations
|
||||
if err := migrator.Up(ctx, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(1),
|
||||
FinishedAt: ts(2),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(3),
|
||||
FinishedAt: ts(4),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(5),
|
||||
FinishedAt: ts(6),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(4),
|
||||
Name: "migration four",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(7),
|
||||
FinishedAt: ts(8),
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
|
||||
// assert each migration was called only once
|
||||
migrationOne.assertUpCalled(t, 1)
|
||||
migrationTwo.assertUpCalled(t, 1)
|
||||
migrationThree.assertUpCalled(t, 1)
|
||||
migrationFour.assertUpCalled(t, 1)
|
||||
})
|
||||
|
||||
t.Run("Down() calls down for each migration", func(t *testing.T) {
|
||||
// apply all migrations
|
||||
if err := migrator.Down(ctx, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(4),
|
||||
Name: "migration four",
|
||||
State: kv.DownMigrationState,
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
|
||||
// assert each migration was called only once
|
||||
migrationOne.assertDownCalled(t, 1)
|
||||
migrationTwo.assertDownCalled(t, 1)
|
||||
migrationThree.assertDownCalled(t, 1)
|
||||
migrationFour.assertDownCalled(t, 1)
|
||||
})
|
||||
|
||||
t.Run("Up() re-applies all migrations", func(t *testing.T) {
|
||||
// apply all migrations
|
||||
if err := migrator.Up(ctx, store); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// list migration again
|
||||
migrations, err := migrator.List(ctx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := []kv.Migration{
|
||||
{
|
||||
ID: influxdb.ID(1),
|
||||
Name: "migration one",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(9),
|
||||
FinishedAt: ts(10),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(2),
|
||||
Name: "migration two",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(11),
|
||||
FinishedAt: ts(12),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(3),
|
||||
Name: "migration three",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(13),
|
||||
FinishedAt: ts(14),
|
||||
},
|
||||
{
|
||||
ID: influxdb.ID(4),
|
||||
Name: "migration four",
|
||||
State: kv.UpMigrationState,
|
||||
StartedAt: ts(15),
|
||||
FinishedAt: ts(16),
|
||||
},
|
||||
}; !reflect.DeepEqual(expected, migrations) {
|
||||
t.Errorf("expected %#v, found %#v", expected, migrations)
|
||||
}
|
||||
|
||||
// assert each migration up was called for a second time
|
||||
migrationOne.assertUpCalled(t, 2)
|
||||
migrationTwo.assertUpCalled(t, 2)
|
||||
migrationThree.assertUpCalled(t, 2)
|
||||
migrationFour.assertUpCalled(t, 2)
|
||||
})
|
||||
|
||||
t.Run("List() missing migration spec errors as expected", func(t *testing.T) {
|
||||
// remove last specification from migration list
|
||||
migrator.MigrationSpecs = migrator.MigrationSpecs[:len(migrator.MigrationSpecs)-1]
|
||||
// list migration again
|
||||
_, err := migrator.List(ctx, store)
|
||||
if !errors.Is(err, kv.ErrMigrationSpecNotFound) {
|
||||
t.Errorf("expected migration spec error, found %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func newMigration(name string) *spyMigrationSpec {
|
||||
return &spyMigrationSpec{name: name}
|
||||
}
|
||||
|
||||
type spyMigrationSpec struct {
|
||||
name string
|
||||
upCalled int
|
||||
downCalled int
|
||||
}
|
||||
|
||||
func (s *spyMigrationSpec) MigrationName() string {
|
||||
return s.name
|
||||
}
|
||||
|
||||
func (s *spyMigrationSpec) assertUpCalled(t *testing.T, times int) {
|
||||
t.Helper()
|
||||
if s.upCalled != times {
|
||||
t.Errorf("expected Up() to be called %d times, instead found %d times", times, s.upCalled)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *spyMigrationSpec) Up(ctx context.Context, store kv.Store) error {
|
||||
s.upCalled++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *spyMigrationSpec) assertDownCalled(t *testing.T, times int) {
|
||||
t.Helper()
|
||||
if s.downCalled != times {
|
||||
t.Errorf("expected Down() to be called %d times, instead found %d times", times, s.downCalled)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *spyMigrationSpec) Down(ctx context.Context, store kv.Store) error {
|
||||
s.downCalled++
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue