feat: take a backup of KV/SQL metadata stores before applying migrations (#22765)
parent
88afa9229b
commit
335b74b25f
|
@ -1018,8 +1018,9 @@ func (m *Launcher) openMetaStores(ctx context.Context, opts *InfluxdOpts) (strin
|
||||||
m.flushers = append(m.flushers, kvStore, sqlStore)
|
m.flushers = append(m.flushers, kvStore, sqlStore)
|
||||||
}
|
}
|
||||||
|
|
||||||
boltMigrator, err := migration.NewMigrator(
|
// Apply migrations to the KV and SQL metadata stores.
|
||||||
m.log.With(zap.String("service", "bolt migrations")),
|
kvMigrator, err := migration.NewMigrator(
|
||||||
|
m.log.With(zap.String("service", "KV migrations")),
|
||||||
kvStore,
|
kvStore,
|
||||||
all.Migrations[:]...,
|
all.Migrations[:]...,
|
||||||
)
|
)
|
||||||
|
@ -1027,18 +1028,21 @@ func (m *Launcher) openMetaStores(ctx context.Context, opts *InfluxdOpts) (strin
|
||||||
m.log.Error("Failed to initialize kv migrator", zap.Error(err))
|
m.log.Error("Failed to initialize kv migrator", zap.Error(err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
sqlMigrator := sqlite.NewMigrator(sqlStore, m.log.With(zap.String("service", "SQL migrations")))
|
||||||
|
|
||||||
// apply migrations to the bolt metadata store
|
// If we're migrating a persistent data store, take a backup of the pre-migration state for rollback.
|
||||||
if err := boltMigrator.Up(ctx); err != nil {
|
if opts.StoreType == DiskStore || opts.StoreType == BoltStore {
|
||||||
m.log.Error("Failed to apply bolt migrations", zap.Error(err))
|
backupPattern := "%s.pre-%s-upgrade.backup"
|
||||||
|
info := platform.GetBuildInfo()
|
||||||
|
kvMigrator.SetBackupPath(fmt.Sprintf(backupPattern, opts.BoltPath, info.Version))
|
||||||
|
sqlMigrator.SetBackupPath(fmt.Sprintf(backupPattern, opts.SqLitePath, info.Version))
|
||||||
|
}
|
||||||
|
if err := kvMigrator.Up(ctx); err != nil {
|
||||||
|
m.log.Error("Failed to apply KV migrations", zap.Error(err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
if err := sqlMigrator.Up(ctx, sqliteMigrations.All); err != nil {
|
||||||
sqliteMigrator := sqlite.NewMigrator(sqlStore, m.log.With(zap.String("service", "sqlite migrations")))
|
m.log.Error("Failed to apply SQL migrations", zap.Error(err))
|
||||||
|
|
||||||
// apply migrations to the sqlite metadata store
|
|
||||||
if err := sqliteMigrator.Up(ctx, sqliteMigrations.All); err != nil {
|
|
||||||
m.log.Error("Failed to apply sqlite migrations", zap.Error(err))
|
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/platform"
|
"github.com/influxdata/influxdb/v2/kit/platform"
|
||||||
|
@ -72,6 +73,7 @@ type Migrator struct {
|
||||||
Specs []Spec
|
Specs []Spec
|
||||||
|
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
|
backupPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMigrator constructs and configures a new Migrator.
|
// NewMigrator constructs and configures a new Migrator.
|
||||||
|
@ -99,6 +101,11 @@ func (m *Migrator) AddMigrations(ms ...Spec) {
|
||||||
m.Specs = append(m.Specs, ms...)
|
m.Specs = append(m.Specs, ms...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetBackupPath records the filepath where pre-migration state should be written prior to running migrations.
|
||||||
|
func (m *Migrator) SetBackupPath(path string) {
|
||||||
|
m.backupPath = path
|
||||||
|
}
|
||||||
|
|
||||||
// List returns a list of migrations and their states within the provided store.
|
// List returns a list of migrations and their states within the provided store.
|
||||||
func (m *Migrator) List(ctx context.Context) (migrations []Migration, _ error) {
|
func (m *Migrator) List(ctx context.Context) (migrations []Migration, _ error) {
|
||||||
if err := m.walk(ctx, m.store, func(id platform.ID, m Migration) {
|
if err := m.walk(ctx, m.store, func(id platform.ID, m Migration) {
|
||||||
|
@ -149,10 +156,29 @@ func (m *Migrator) Up(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
migrationsToDo := len(m.Specs[lastMigration:])
|
migrationsToDo := len(m.Specs[lastMigration:])
|
||||||
if migrationsToDo > 0 {
|
if migrationsToDo == 0 {
|
||||||
m.logger.Info("Bringing up metadata migrations", zap.Int("migration_count", migrationsToDo))
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.backupPath != "" && lastMigration != 0 {
|
||||||
|
m.logger.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath))
|
||||||
|
if err := func() error {
|
||||||
|
out, err := os.Create(m.backupPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
if err := m.store.Backup(ctx, out); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return fmt.Errorf("failed to back up pre-migration metadata: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.logger.Info("Bringing up metadata migrations", zap.Int("migration_count", migrationsToDo))
|
||||||
for idx, spec := range m.Specs[lastMigration:] {
|
for idx, spec := range m.Specs[lastMigration:] {
|
||||||
startedAt := m.now()
|
startedAt := m.now()
|
||||||
migration := Migration{
|
migration := Migration{
|
||||||
|
|
|
@ -3,15 +3,19 @@ package migration_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/bolt"
|
"github.com/influxdata/influxdb/v2/bolt"
|
||||||
"github.com/influxdata/influxdb/v2/inmem"
|
"github.com/influxdata/influxdb/v2/inmem"
|
||||||
"github.com/influxdata/influxdb/v2/kv"
|
"github.com/influxdata/influxdb/v2/kv"
|
||||||
"github.com/influxdata/influxdb/v2/kv/migration"
|
"github.com/influxdata/influxdb/v2/kv/migration"
|
||||||
|
"github.com/influxdata/influxdb/v2/kv/migration/all"
|
||||||
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
|
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
@ -40,7 +44,65 @@ func Test_Bolt_Migrator(t *testing.T) {
|
||||||
influxdbtesting.Migrator(t, store, newMigrator)
|
influxdbtesting.Migrator(t, store, newMigrator)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestBoltStoreWithoutMigrations(t *testing.T) (kv.SchemaStore, func(), error) {
|
func Test_Bolt_MigratorWithBackup(t *testing.T) {
|
||||||
|
store, closeBolt, err := newTestBoltStoreWithoutMigrations(t)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer closeBolt()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
migrator := newMigrator(t, zaptest.NewLogger(t), store, time.Now)
|
||||||
|
backupPath := fmt.Sprintf("%s.bak", store.DB().Path())
|
||||||
|
migrator.SetBackupPath(backupPath)
|
||||||
|
|
||||||
|
// Run the first migration.
|
||||||
|
migrator.AddMigrations(all.Migration0001_InitialMigration)
|
||||||
|
require.NoError(t, migrator.Up(ctx))
|
||||||
|
|
||||||
|
// List of applied migrations should now have length 1.
|
||||||
|
ms, err := migrator.List(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(ms))
|
||||||
|
|
||||||
|
// Backup file shouldn't exist because there was no previous state to back up.
|
||||||
|
_, err = os.Stat(backupPath)
|
||||||
|
require.True(t, os.IsNotExist(err))
|
||||||
|
|
||||||
|
// Run a few more migrations.
|
||||||
|
migrator.AddMigrations(all.Migrations[1:5]...)
|
||||||
|
require.NoError(t, migrator.Up(ctx))
|
||||||
|
|
||||||
|
// List of applied migrations should now have length 5.
|
||||||
|
ms, err = migrator.List(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 5, len(ms))
|
||||||
|
|
||||||
|
// Backup file should now exist.
|
||||||
|
_, err = os.Stat(backupPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Open a 2nd store using the backup file.
|
||||||
|
backupStore := bolt.NewKVStore(zaptest.NewLogger(t), backupPath, bolt.WithNoSync)
|
||||||
|
require.NoError(t, backupStore.Open(ctx))
|
||||||
|
defer backupStore.Close()
|
||||||
|
|
||||||
|
// List of applied migrations in the backup should be 1.
|
||||||
|
backupMigrator := newMigrator(t, zaptest.NewLogger(t), backupStore, time.Now)
|
||||||
|
backupMigrator.AddMigrations(all.Migration0001_InitialMigration)
|
||||||
|
backupMs, err := backupMigrator.List(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(backupMs))
|
||||||
|
|
||||||
|
// Run the other migrations on the backup.
|
||||||
|
backupMigrator.AddMigrations(all.Migrations[1:5]...)
|
||||||
|
require.NoError(t, backupMigrator.Up(ctx))
|
||||||
|
|
||||||
|
// List of applied migrations in the backup should be 5.
|
||||||
|
backupMs, err = backupMigrator.List(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 5, len(backupMs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestBoltStoreWithoutMigrations(t *testing.T) (*bolt.KVStore, func(), error) {
|
||||||
f, err := ioutil.TempFile("", "influxdata-bolt-")
|
f, err := ioutil.TempFile("", "influxdata-bolt-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.New("unable to open temporary boltdb file")
|
return nil, nil, errors.New("unable to open temporary boltdb file")
|
||||||
|
|
|
@ -3,6 +3,8 @@ package sqlite
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"embed"
|
"embed"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -13,6 +15,8 @@ import (
|
||||||
type Migrator struct {
|
type Migrator struct {
|
||||||
store *SqlStore
|
store *SqlStore
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
|
||||||
|
backupPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMigrator(store *SqlStore, log *zap.Logger) *Migrator {
|
func NewMigrator(store *SqlStore, log *zap.Logger) *Migrator {
|
||||||
|
@ -22,6 +26,11 @@ func NewMigrator(store *SqlStore, log *zap.Logger) *Migrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetBackupPath records the filepath where pre-migration state should be written prior to running migrations.
|
||||||
|
func (m *Migrator) SetBackupPath(path string) {
|
||||||
|
m.backupPath = path
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
||||||
list, err := source.ReadDir(".")
|
list, err := source.ReadDir(".")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,11 +53,30 @@ func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// log this message only if there are migrations to run
|
if final == current {
|
||||||
if final > current {
|
return nil
|
||||||
m.log.Info("Bringing up metadata migrations", zap.Int("migration_count", final-current))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.backupPath != "" && current != 0 {
|
||||||
|
m.log.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath))
|
||||||
|
if err := func() error {
|
||||||
|
out, err := os.Create(m.backupPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
if err := m.store.BackupSqlStore(ctx, out); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}(); err != nil {
|
||||||
|
return fmt.Errorf("failed to back up pre-migration metadata: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.log.Info("Bringing up metadata migrations", zap.Int("migration_count", final-current))
|
||||||
|
|
||||||
for _, f := range list {
|
for _, f := range list {
|
||||||
n := f.Name()
|
n := f.Name()
|
||||||
// get the version of this migration script
|
// get the version of this migration script
|
||||||
|
|
|
@ -2,11 +2,14 @@ package sqlite
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb/v2/kit/errors"
|
"github.com/influxdata/influxdb/v2/kit/errors"
|
||||||
"github.com/influxdata/influxdb/v2/sqlite/test_migrations"
|
"github.com/influxdata/influxdb/v2/sqlite/test_migrations"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -54,6 +57,62 @@ func TestUp(t *testing.T) {
|
||||||
require.Equal(t, "user_id", table2Info[0].Name)
|
require.Equal(t, "user_id", table2Info[0].Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpWithBackups(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
store, clean := NewTestStore(t)
|
||||||
|
defer clean(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
logger := zaptest.NewLogger(t)
|
||||||
|
migrator := NewMigrator(store, logger)
|
||||||
|
backupPath := fmt.Sprintf("%s.bak", store.path)
|
||||||
|
migrator.SetBackupPath(backupPath)
|
||||||
|
|
||||||
|
// Run the first migration.
|
||||||
|
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
||||||
|
|
||||||
|
// user_version should now be 1.
|
||||||
|
v, err := store.userVersion()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, v)
|
||||||
|
|
||||||
|
// Backup file shouldn't exist, because there was nothing to back up.
|
||||||
|
_, err = os.Stat(backupPath)
|
||||||
|
require.True(t, os.IsNotExist(err))
|
||||||
|
|
||||||
|
// Run the remaining migrations.
|
||||||
|
require.NoError(t, migrator.Up(ctx, test_migrations.Rest))
|
||||||
|
|
||||||
|
// user_version should now be 3.
|
||||||
|
v, err = store.userVersion()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 3, v)
|
||||||
|
|
||||||
|
// Backup file should now exist.
|
||||||
|
_, err = os.Stat(backupPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Open a 2nd store using the backup file.
|
||||||
|
backupStore, err := NewSqlStore(backupPath, zap.NewNop())
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer backupStore.Close()
|
||||||
|
|
||||||
|
// user_version should be 1 in the backup.
|
||||||
|
v, err = backupStore.userVersion()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, v)
|
||||||
|
|
||||||
|
// Run the remaining migrations on the backup.
|
||||||
|
backupMigrator := NewMigrator(backupStore, logger)
|
||||||
|
require.NoError(t, backupMigrator.Up(ctx, test_migrations.Rest))
|
||||||
|
|
||||||
|
// user_version should now be 3 in the backup.
|
||||||
|
v, err = backupStore.userVersion()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 3, v)
|
||||||
|
}
|
||||||
|
|
||||||
func TestScriptVersion(t *testing.T) {
|
func TestScriptVersion(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -4,3 +4,9 @@ import "embed"
|
||||||
|
|
||||||
//go:embed *.sql
|
//go:embed *.sql
|
||||||
var All embed.FS
|
var All embed.FS
|
||||||
|
|
||||||
|
//go:embed 0001_create_test_table_1.sql
|
||||||
|
var First embed.FS
|
||||||
|
|
||||||
|
//go:embed 0002_rename_test_table_id_1.sql 0003_create_test_table_2.sql
|
||||||
|
var Rest embed.FS
|
||||||
|
|
Loading…
Reference in New Issue