feat: sql migrator can do down migrations (#22806)
* feat: sql down migrations * refactor: different name for up migrations * chore: update migrations ref in svc tests * build: add lint step to verify sql migration names matchpull/22813/head
parent
a4fd1caf8e
commit
f7573f43a7
|
@ -461,6 +461,10 @@ jobs:
|
|||
name: Check flag generation
|
||||
command: ./scripts/ci/lint/flags.bash
|
||||
when: always
|
||||
- run:
|
||||
name: Check SQL migrations
|
||||
command: make checksqlmigrations
|
||||
when: always
|
||||
- run:
|
||||
name: Check formatting
|
||||
command: make checkfmt
|
||||
|
|
3
Makefile
3
Makefile
|
@ -115,6 +115,9 @@ checktidy:
|
|||
checkgenerate:
|
||||
./etc/checkgenerate.sh
|
||||
|
||||
checksqlmigrations:
|
||||
./etc/check-sql-migrations.sh
|
||||
|
||||
# generate-web-assets outputs all the files needed to link the UI to the back-end.
|
||||
# Currently, none of these files are tracked by git.
|
||||
generate-web-assets: static/static_gen.go
|
||||
|
|
|
@ -1028,7 +1028,7 @@ func newTestService(t *testing.T) (*Service, func(t *testing.T)) {
|
|||
ctx := context.Background()
|
||||
|
||||
sqliteMigrator := sqlite.NewMigrator(store, zap.NewNop())
|
||||
err := sqliteMigrator.Up(ctx, migrations.All)
|
||||
err := sqliteMigrator.Up(ctx, migrations.AllUp)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := NewService(store)
|
||||
|
|
|
@ -1041,7 +1041,7 @@ func (m *Launcher) openMetaStores(ctx context.Context, opts *InfluxdOpts) (strin
|
|||
m.log.Error("Failed to apply KV migrations", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
if err := sqlMigrator.Up(ctx, sqliteMigrations.All); err != nil {
|
||||
if err := sqlMigrator.Up(ctx, sqliteMigrations.AllUp); err != nil {
|
||||
m.log.Error("Failed to apply SQL migrations", zap.Error(err))
|
||||
return "", err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
#!/bin/bash
|
||||
|
||||
# This script verifies that for all SQL migrations there is both an "up" and a "down", and that the file names match.
|
||||
|
||||
upMigrations=$(find sqlite/migrations/*.up.sql | cut -f 1 -d '.')
|
||||
downMigrations=$(find sqlite/migrations/*.down.sql | cut -f 1 -d '.')
|
||||
|
||||
differences="$(diff -y --suppress-common-lines <(echo "$upMigrations" ) <(echo "$downMigrations"))"
|
||||
|
||||
if [[ -n ${differences} ]]
|
||||
then
|
||||
echo '------------------------------------------------------------------------------------'
|
||||
echo "Problem detected with SQL migration files: Up and Down migration names do not match!"
|
||||
echo '------------------------------------------------------------------------------------'
|
||||
echo "Diff: Up Migrations without Down Migrations vs. Down Migrations without Up Migrations:"
|
||||
echo "$differences"
|
||||
exit 1
|
||||
fi
|
|
@ -200,7 +200,7 @@ func newTestService(t *testing.T) (*Service, func(t *testing.T)) {
|
|||
ctx := context.Background()
|
||||
|
||||
sqliteMigrator := sqlite.NewMigrator(store, zap.NewNop())
|
||||
err := sqliteMigrator.Up(ctx, migrations.All)
|
||||
err := sqliteMigrator.Up(ctx, migrations.AllUp)
|
||||
require.NoError(t, err)
|
||||
|
||||
svc := NewService(store)
|
||||
|
|
|
@ -327,7 +327,7 @@ func newTestService(t *testing.T) (*service, *remotesMock.MockRemoteConnectionVa
|
|||
store, clean := sqlite.NewTestStore(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
sqliteMigrator := sqlite.NewMigrator(store, logger)
|
||||
require.NoError(t, sqliteMigrator.Up(ctx, migrations.All))
|
||||
require.NoError(t, sqliteMigrator.Up(ctx, migrations.AllUp))
|
||||
|
||||
mockValidator := remotesMock.NewMockRemoteConnectionValidator(gomock.NewController(t))
|
||||
svc := service{
|
||||
|
|
|
@ -578,7 +578,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) {
|
|||
store, clean := sqlite.NewTestStore(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
sqliteMigrator := sqlite.NewMigrator(store, logger)
|
||||
require.NoError(t, sqliteMigrator.Up(ctx, migrations.All))
|
||||
require.NoError(t, sqliteMigrator.Up(ctx, migrations.AllUp))
|
||||
|
||||
// Make sure foreign-key checking is enabled.
|
||||
_, err := store.DB.Exec("PRAGMA foreign_keys = ON;")
|
||||
|
|
|
@ -46,9 +46,13 @@ table called "migrations". If records of migrations exist in the "migrations"
|
|||
table that are not embedded in the binary, an error will be raised on startup.
|
||||
|
||||
When creating new migrations, follow the file naming convention established by
|
||||
existing migration scripts, which should look like `00XX_script_name.sql`, where
|
||||
`XX` is the version number. New scripts should have the version number
|
||||
incremented by 1.
|
||||
existing migration scripts, which should look like `00XX_script_name.up.sql` &
|
||||
`00xx_script_name.down.sql` for the "up" and "down" migration, where `XX` is the
|
||||
version number. New scripts should have the version number incremented by 1.
|
||||
|
||||
The "up" migrations are run when starting the influx daemon and when metadata
|
||||
backups are restored. The "down" migrations are run with the `influxd downgrade`
|
||||
command.
|
||||
|
||||
### In-Memory Database
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE migrations;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE notebooks;
|
|
@ -0,0 +1,2 @@
|
|||
DROP TABLE streams;
|
||||
DROP TABLE annotations;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE remotes;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE replications;
|
|
@ -2,5 +2,8 @@ package migrations
|
|||
|
||||
import "embed"
|
||||
|
||||
//go:embed *.sql
|
||||
var All embed.FS
|
||||
//go:embed *up.sql
|
||||
var AllUp embed.FS
|
||||
|
||||
//go:embed *down.sql
|
||||
var AllDown embed.FS
|
||||
|
|
|
@ -94,7 +94,7 @@ func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
|||
return err
|
||||
}
|
||||
|
||||
recordStmt := fmt.Sprintf(`INSERT INTO migrations (name) VALUES (%q);`, dropExtension(n))
|
||||
recordStmt := fmt.Sprintf(`INSERT INTO %s (name) VALUES (%q);`, migrationsTableName, dropExtension(n))
|
||||
|
||||
if err := m.store.execTrans(ctx, string(mBytes)+recordStmt); err != nil {
|
||||
return err
|
||||
|
@ -105,6 +105,74 @@ func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Down applies the "down" migrations until the SQL database has migrations only >= untilMigration. Use untilMigration = 0 to apply all
|
||||
// down migrations, which will delete all data from the database.
|
||||
func (m *Migrator) Down(ctx context.Context, untilMigration int, source embed.FS) error {
|
||||
knownMigrations, err := source.ReadDir(".")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// sort the list according to the version number to ensure the migrations are applied in the correct order
|
||||
sort.Slice(knownMigrations, func(i, j int) bool {
|
||||
return knownMigrations[i].Name() < knownMigrations[j].Name()
|
||||
})
|
||||
|
||||
executedMigrations, err := m.store.allMigrationNames()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrationsToDo := len(executedMigrations) - untilMigration
|
||||
if migrationsToDo == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if migrationsToDo < 0 {
|
||||
m.log.Warn("SQL metadata is already on a schema older than target, nothing to do")
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.backupPath != "" {
|
||||
m.log.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath))
|
||||
if err := func() error {
|
||||
out, err := os.Create(m.backupPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
|
||||
if err := m.store.BackupSqlStore(ctx, out); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return fmt.Errorf("failed to back up pre-migration metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
m.log.Info("Tearing down metadata migrations", zap.Int("migration_count", migrationsToDo))
|
||||
|
||||
for i := len(executedMigrations) - 1; i >= untilMigration; i-- {
|
||||
downName := knownMigrations[i].Name()
|
||||
downNameNoExtension := dropExtension(downName)
|
||||
|
||||
m.log.Debug("Executing metadata migration", zap.String("migration_name", downName))
|
||||
mBytes, err := source.ReadFile(downName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deleteStmt := fmt.Sprintf(`DELETE FROM %s WHERE name = %q;`, migrationsTableName, downNameNoExtension)
|
||||
|
||||
if err := m.store.execTrans(ctx, deleteStmt+string(mBytes)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extract the version number as an integer from a file named like "0002_migration_name.sql"
|
||||
func scriptVersion(filename string) (int, error) {
|
||||
vString := strings.Split(filename, "_")[0]
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"embed"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
|
@ -29,7 +30,14 @@ func TestUp(t *testing.T) {
|
|||
|
||||
store, clean := NewTestStore(t)
|
||||
defer clean(t)
|
||||
ctx := context.Background()
|
||||
|
||||
upsOnlyAll, err := test_migrations.AllUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
|
||||
upsOnlyFirst, err := test_migrations.FirstUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
|
||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||
|
||||
// empty db contains no migrations
|
||||
names, err := store.allMigrationNames()
|
||||
|
@ -37,18 +45,10 @@ func TestUp(t *testing.T) {
|
|||
require.Equal(t, []string(nil), names)
|
||||
|
||||
// run the first migrations
|
||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
||||
names, err = store.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, test_migrations.First)
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.FirstUp, upsOnlyFirst)
|
||||
|
||||
// run the rest of the migrations
|
||||
err = migrator.Up(ctx, test_migrations.All)
|
||||
require.NoError(t, err)
|
||||
names, err = store.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, test_migrations.All)
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.AllUp, upsOnlyAll)
|
||||
|
||||
// test_table_1 had the "id" column renamed to "org_id"
|
||||
var table1Info []*tableInfo
|
||||
|
@ -76,7 +76,7 @@ func TestUpErrors(t *testing.T) {
|
|||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.MigrationTable))
|
||||
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
||||
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
||||
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.AllUp))
|
||||
})
|
||||
|
||||
t.Run("known + unknown migrations exist", func(t *testing.T) {
|
||||
|
@ -85,9 +85,9 @@ func TestUpErrors(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.FirstUp))
|
||||
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
|
||||
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
|
||||
require.Equal(t, migration.ErrInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.AllUp))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -96,28 +96,27 @@ func TestUpWithBackups(t *testing.T) {
|
|||
|
||||
store, clean := NewTestStore(t)
|
||||
defer clean(t)
|
||||
ctx := context.Background()
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
migrator := NewMigrator(store, logger)
|
||||
backupPath := fmt.Sprintf("%s.bak", store.path)
|
||||
migrator.SetBackupPath(backupPath)
|
||||
|
||||
// Run the first migrations.
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.First))
|
||||
names, err := store.allMigrationNames()
|
||||
upsOnlyAll, err := test_migrations.AllUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, test_migrations.First)
|
||||
|
||||
upsOnlyFirst, err := test_migrations.FirstUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Run the first migrations.
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.FirstUp, upsOnlyFirst)
|
||||
|
||||
// Backup file shouldn't exist, because there was nothing to back up.
|
||||
_, err = os.Stat(backupPath)
|
||||
require.True(t, os.IsNotExist(err))
|
||||
|
||||
// Run the remaining migrations.
|
||||
require.NoError(t, migrator.Up(ctx, test_migrations.All))
|
||||
names, err = store.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, test_migrations.All)
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.AllUp, upsOnlyAll)
|
||||
|
||||
// Backup file should now exist.
|
||||
_, err = os.Stat(backupPath)
|
||||
|
@ -131,16 +130,45 @@ func TestUpWithBackups(t *testing.T) {
|
|||
// Backup store contains the first migrations records.
|
||||
backupNames, err := backupStore.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, backupNames, test_migrations.First)
|
||||
migrationNamesMatch(t, backupNames, upsOnlyFirst)
|
||||
|
||||
// Run the remaining migrations on the backup.
|
||||
// Run the remaining migrations on the backup and verify that it now contains the rest of the migration records.
|
||||
backupMigrator := NewMigrator(backupStore, logger)
|
||||
require.NoError(t, backupMigrator.Up(ctx, test_migrations.All))
|
||||
migrateUpAndCheck(t, backupMigrator, store, test_migrations.AllUp, upsOnlyAll)
|
||||
}
|
||||
|
||||
// Backup store now contains the rest of the migration records.
|
||||
backupNames, err = backupStore.allMigrationNames()
|
||||
func TestDown(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
store, clean := NewTestStore(t)
|
||||
defer clean(t)
|
||||
|
||||
upsOnlyAll, err := test_migrations.AllUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, backupNames, test_migrations.All)
|
||||
|
||||
upsOnlyFirst, err := test_migrations.FirstUp.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
|
||||
migrator := NewMigrator(store, zaptest.NewLogger(t))
|
||||
|
||||
// no up migrations, then some down migrations
|
||||
migrateDownAndCheck(t, migrator, store, test_migrations.FirstDown, []fs.DirEntry{}, 0)
|
||||
|
||||
// all up migrations, then all down migrations
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.AllUp, upsOnlyAll)
|
||||
migrateDownAndCheck(t, migrator, store, test_migrations.AllDown, []fs.DirEntry{}, 0)
|
||||
|
||||
// first of the up migrations, then first of the down migrations
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.FirstUp, upsOnlyFirst)
|
||||
migrateDownAndCheck(t, migrator, store, test_migrations.FirstDown, []fs.DirEntry{}, 0)
|
||||
|
||||
// first of the up migrations, then all of the down migrations
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.FirstUp, upsOnlyFirst)
|
||||
migrateDownAndCheck(t, migrator, store, test_migrations.AllDown, []fs.DirEntry{}, 0)
|
||||
|
||||
// all up migrations, then some of the down migrations (using untilMigration)
|
||||
migrateUpAndCheck(t, migrator, store, test_migrations.AllUp, upsOnlyAll)
|
||||
migrateDownAndCheck(t, migrator, store, test_migrations.AllDown, upsOnlyFirst, 2)
|
||||
}
|
||||
|
||||
func TestScriptVersion(t *testing.T) {
|
||||
|
@ -216,13 +244,30 @@ func TestDropExtension(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func migrationNamesMatch(t *testing.T, names []string, files embed.FS) {
|
||||
func migrateUpAndCheck(t *testing.T, m *Migrator, s *SqlStore, source embed.FS, expected []fs.DirEntry) {
|
||||
t.Helper()
|
||||
storedMigrations, err := files.ReadDir(".")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(storedMigrations), len(names))
|
||||
|
||||
for idx := range storedMigrations {
|
||||
require.Equal(t, dropExtension(storedMigrations[idx].Name()), names[idx])
|
||||
require.NoError(t, m.Up(context.Background(), source))
|
||||
names, err := s.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, expected)
|
||||
}
|
||||
|
||||
func migrateDownAndCheck(t *testing.T, m *Migrator, s *SqlStore, source embed.FS, expected []fs.DirEntry, untilMigration int) {
|
||||
t.Helper()
|
||||
|
||||
require.NoError(t, m.Down(context.Background(), untilMigration, source))
|
||||
names, err := s.allMigrationNames()
|
||||
require.NoError(t, err)
|
||||
migrationNamesMatch(t, names, expected)
|
||||
}
|
||||
|
||||
func migrationNamesMatch(t *testing.T, names []string, files []fs.DirEntry) {
|
||||
t.Helper()
|
||||
|
||||
require.Equal(t, len(names), len(files))
|
||||
|
||||
for idx := range files {
|
||||
require.Equal(t, dropExtension(files[idx].Name()), names[idx])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -293,7 +293,7 @@ func (s *SqlStore) migrateRestored(ctx context.Context, tempFileName string) err
|
|||
s.log.With(zap.String("service", "sqlite restore migrations")),
|
||||
)
|
||||
|
||||
return restoreMigrator.Up(ctx, sqliteMigrations.All)
|
||||
return restoreMigrator.Up(ctx, sqliteMigrations.AllUp)
|
||||
}
|
||||
|
||||
func (s *SqlStore) execTrans(ctx context.Context, stmt string) error {
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE migrations;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE test_table_1;
|
|
@ -0,0 +1,13 @@
|
|||
ALTER TABLE test_table_1 RENAME TO _test_table_1_old;
|
||||
|
||||
CREATE TABLE test_table_1 (
|
||||
id TEXT NOT NULL PRIMARY KEY,
|
||||
created_at TIMESTAMP,
|
||||
updated_at TIMESTAMP
|
||||
);
|
||||
|
||||
INSERT INTO test_table_1 (id, updated_at, created_at)
|
||||
SELECT org_id, updated_at, created_at
|
||||
FROM _test_table_1_old;
|
||||
|
||||
DROP TABLE _test_table_1_old;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE test_table_2;
|
|
@ -2,11 +2,17 @@ package test_migrations
|
|||
|
||||
import "embed"
|
||||
|
||||
//go:embed *.sql
|
||||
var All embed.FS
|
||||
//go:embed *.up.sql
|
||||
var AllUp embed.FS
|
||||
|
||||
//go:embed 0001_create_migrations_table.sql
|
||||
//go:embed *.down.sql
|
||||
var AllDown embed.FS
|
||||
|
||||
//go:embed 0001_create_migrations_table.up.sql
|
||||
var MigrationTable embed.FS
|
||||
|
||||
//go:embed 0001_create_migrations_table.sql 0002_create_test_table_1.sql
|
||||
var First embed.FS
|
||||
//go:embed 0001_create_migrations_table.up.sql 0002_create_test_table_1.up.sql
|
||||
var FirstUp embed.FS
|
||||
|
||||
//go:embed 0001_create_migrations_table.down.sql 0002_create_test_table_1.down.sql
|
||||
var FirstDown embed.FS
|
||||
|
|
Loading…
Reference in New Issue