feat: sql migrator uses records of completed migrations (#22797)

* feat: sql migrator uses records of completed migrations
pull/22805/head
William Baker 2021-10-29 09:29:10 -06:00 committed by GitHub
parent 335b74b25f
commit b3b4dd6503
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 258 additions and 145 deletions

View File

@ -41,9 +41,9 @@ the same time.
A simple migration system is implemented in `migrator.go`. When starting the A simple migration system is implemented in `migrator.go`. When starting the
influx daemon, the migrator runs migrations defined in `.sql` files using influx daemon, the migrator runs migrations defined in `.sql` files using
sqlite-compatible sql scripts. These migration scripts include a statement to sqlite-compatible sql scripts. Records of these migrations are maintained in a
set the `user_version` pragma in the database, and only scripts with a higher table called "migrations". If records of migrations exist in the "migrations"
`user_version` than the currently stored pragma are executed. table that are not embedded in the binary, an error will be raised on startup.
When creating new migrations, follow the file naming convention established by When creating new migrations, follow the file naming convention established by
existing migration scripts, which should look like `00XX_script_name.sql`, where existing migration scripts, which should look like `00XX_script_name.sql`, where

View File

@ -0,0 +1,4 @@
CREATE TABLE migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);

View File

@ -1,13 +0,0 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=1;
-- Create the initial table to store notebooks
CREATE TABLE notebooks (
id TEXT NOT NULL PRIMARY KEY,
org_id TEXT NOT NULL,
name TEXT NOT NULL,
spec TEXT NOT NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -0,0 +1,8 @@
CREATE TABLE notebooks (
id TEXT NOT NULL PRIMARY KEY,
org_id TEXT NOT NULL,
name TEXT NOT NULL,
spec TEXT NOT NULL,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -1,7 +1,3 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=2;
-- Create the initial table to store streams -- Create the initial table to store streams
CREATE TABLE streams ( CREATE TABLE streams (
id VARCHAR(16) PRIMARY KEY, id VARCHAR(16) PRIMARY KEY,

View File

@ -1,7 +1,3 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=3;
CREATE TABLE remotes ( CREATE TABLE remotes (
id VARCHAR(16) NOT NULL PRIMARY KEY, id VARCHAR(16) NOT NULL PRIMARY KEY,
org_id VARCHAR(16) NOT NULL, org_id VARCHAR(16) NOT NULL,

View File

@ -1,7 +1,3 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=4;
CREATE TABLE replications CREATE TABLE replications
( (
id VARCHAR(16) NOT NULL PRIMARY KEY, id VARCHAR(16) NOT NULL PRIMARY KEY,

View File

@ -9,9 +9,17 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"go.uber.org/zap" "go.uber.org/zap"
) )
func errInvalidMigration(n string) *errors.Error {
return &errors.Error{
Code: errors.EInternal,
Msg: fmt.Sprintf(`DB contains record of unknown migration %q - if you are downgrading from a more recent version of influxdb, please run the "influxd downgrade" command from that version to revert your metadata to be compatible with this version prior to starting influxd.`, n),
}
}
type Migrator struct { type Migrator struct {
store *SqlStore store *SqlStore
log *zap.Logger log *zap.Logger
@ -32,32 +40,39 @@ func (m *Migrator) SetBackupPath(path string) {
} }
func (m *Migrator) Up(ctx context.Context, source embed.FS) error { func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
list, err := source.ReadDir(".") knownMigrations, err := source.ReadDir(".")
if err != nil { if err != nil {
return err return err
} }
// sort the list according to the version number to ensure the migrations are applied in the correct order // sort the list according to the version number to ensure the migrations are applied in the correct order
sort.Slice(list, func(i, j int) bool { sort.Slice(knownMigrations, func(i, j int) bool {
return list[i].Name() < list[j].Name() return knownMigrations[i].Name() < knownMigrations[j].Name()
}) })
// get the current value for user_version from the database executedMigrations, err := m.store.allMigrationNames()
current, err := m.store.userVersion()
if err != nil { if err != nil {
return err return err
} }
// get the migration number of the latest migration for logging purposes var lastMigration int
final, err := scriptVersion(list[len(list)-1].Name()) for idx := range executedMigrations {
if err != nil { if idx > len(knownMigrations)-1 || executedMigrations[idx] != dropExtension(knownMigrations[idx].Name()) {
return err return errInvalidMigration(executedMigrations[idx])
}
lastMigration, err = scriptVersion(executedMigrations[idx])
if err != nil {
return err
}
} }
if final == current { migrationsToDo := len(knownMigrations[lastMigration:])
if migrationsToDo == 0 {
return nil return nil
} }
if m.backupPath != "" && current != 0 { if m.backupPath != "" && lastMigration != 0 {
m.log.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath)) m.log.Info("Backing up pre-migration metadata", zap.String("backup_path", m.backupPath))
if err := func() error { if err := func() error {
out, err := os.Create(m.backupPath) out, err := os.Create(m.backupPath)
@ -75,36 +90,23 @@ func (m *Migrator) Up(ctx context.Context, source embed.FS) error {
} }
} }
m.log.Info("Bringing up metadata migrations", zap.Int("migration_count", final-current)) m.log.Info("Bringing up metadata migrations", zap.Int("migration_count", migrationsToDo))
for _, f := range list { for _, f := range knownMigrations[lastMigration:] {
n := f.Name() n := f.Name()
// get the version of this migration script
v, err := scriptVersion(n) m.log.Debug("Executing metadata migration", zap.String("migration_name", n))
mBytes, err := source.ReadFile(n)
if err != nil { if err != nil {
return err return err
} }
// get the current value for user_version from the database. this is done in the loop as well to ensure recordStmt := fmt.Sprintf(`INSERT INTO migrations (name) VALUES (%q);`, dropExtension(n))
// that if for some reason the migrations are out of order, newer migrations are not applied after older ones.
c, err := m.store.userVersion() if err := m.store.execTrans(ctx, string(mBytes)+recordStmt); err != nil {
if err != nil {
return err return err
} }
// if the version of the script is greater than the current user_version,
// execute the script to apply the migration
if v > c {
m.log.Debug("Executing metadata migration", zap.String("migration_name", n))
mBytes, err := source.ReadFile(n)
if err != nil {
return err
}
if err := m.store.execTrans(ctx, string(mBytes)); err != nil {
return err
}
}
} }
return nil return nil
@ -120,3 +122,13 @@ func scriptVersion(filename string) (int, error) {
return vInt, nil return vInt, nil
} }
// dropExtension returns the filename excluding anything after the first "."
func dropExtension(filename string) string {
idx := strings.Index(filename, ".")
if idx == -1 {
return filename
}
return filename[:idx]
}

View File

@ -2,6 +2,7 @@ package sqlite
import ( import (
"context" "context"
"embed"
"fmt" "fmt"
"os" "os"
"testing" "testing"
@ -29,34 +30,66 @@ func TestUp(t *testing.T) {
defer clean(t) defer clean(t)
ctx := context.Background() ctx := context.Background()
// a new database should have a user_version of 0 // empty db contains no migrations
v, err := store.userVersion() names, err := store.allMigrationNames()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, v) require.Equal(t, []string(nil), names)
// run the first migrations
migrator := NewMigrator(store, zaptest.NewLogger(t)) migrator := NewMigrator(store, zaptest.NewLogger(t))
migrator.Up(ctx, test_migrations.All) require.NoError(t, migrator.Up(ctx, test_migrations.First))
names, err = store.allMigrationNames()
// user_version should now be 3 after applying the migrations
v, err = store.userVersion()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, v) migrationNamesMatch(t, names, test_migrations.First)
// make sure that test_table_1 had the "id" column renamed to "org_id" // run the rest of the migrations
table1Info := []*tableInfo{} err = migrator.Up(ctx, test_migrations.All)
require.NoError(t, err)
names, err = store.allMigrationNames()
require.NoError(t, err)
migrationNamesMatch(t, names, test_migrations.All)
// test_table_1 had the "id" column renamed to "org_id"
var table1Info []*tableInfo
err = store.DB.Select(&table1Info, "PRAGMA table_info(test_table_1)") err = store.DB.Select(&table1Info, "PRAGMA table_info(test_table_1)")
require.NoError(t, err) require.NoError(t, err)
require.Len(t, table1Info, 3) require.Len(t, table1Info, 3)
require.Equal(t, "org_id", table1Info[0].Name) require.Equal(t, "org_id", table1Info[0].Name)
// make sure that test_table_2 was created correctly // test_table_2 was created correctly
table2Info := []*tableInfo{} var table2Info []*tableInfo
err = store.DB.Select(&table2Info, "PRAGMA table_info(test_table_2)") err = store.DB.Select(&table2Info, "PRAGMA table_info(test_table_2)")
require.NoError(t, err) require.NoError(t, err)
require.Len(t, table2Info, 3) require.Len(t, table2Info, 3)
require.Equal(t, "user_id", table2Info[0].Name) require.Equal(t, "user_id", table2Info[0].Name)
} }
func TestUpErrors(t *testing.T) {
t.Parallel()
t.Run("only unknown migration exists", func(t *testing.T) {
store, clean := NewTestStore(t)
defer clean(t)
ctx := context.Background()
migrator := NewMigrator(store, zaptest.NewLogger(t))
require.NoError(t, migrator.Up(ctx, test_migrations.MigrationTable))
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
require.Equal(t, errInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
})
t.Run("known + unknown migrations exist", func(t *testing.T) {
store, clean := NewTestStore(t)
defer clean(t)
ctx := context.Background()
migrator := NewMigrator(store, zaptest.NewLogger(t))
require.NoError(t, migrator.Up(ctx, test_migrations.First))
require.NoError(t, store.execTrans(ctx, `INSERT INTO migrations (name) VALUES ("0010_some_bad_migration")`))
require.Equal(t, errInvalidMigration("0010_some_bad_migration"), migrator.Up(ctx, test_migrations.All))
})
}
func TestUpWithBackups(t *testing.T) { func TestUpWithBackups(t *testing.T) {
t.Parallel() t.Parallel()
@ -69,25 +102,21 @@ func TestUpWithBackups(t *testing.T) {
backupPath := fmt.Sprintf("%s.bak", store.path) backupPath := fmt.Sprintf("%s.bak", store.path)
migrator.SetBackupPath(backupPath) migrator.SetBackupPath(backupPath)
// Run the first migration. // Run the first migrations.
require.NoError(t, migrator.Up(ctx, test_migrations.First)) require.NoError(t, migrator.Up(ctx, test_migrations.First))
names, err := store.allMigrationNames()
// user_version should now be 1.
v, err := store.userVersion()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, v) migrationNamesMatch(t, names, test_migrations.First)
// Backup file shouldn't exist, because there was nothing to back up. // Backup file shouldn't exist, because there was nothing to back up.
_, err = os.Stat(backupPath) _, err = os.Stat(backupPath)
require.True(t, os.IsNotExist(err)) require.True(t, os.IsNotExist(err))
// Run the remaining migrations. // Run the remaining migrations.
require.NoError(t, migrator.Up(ctx, test_migrations.Rest)) require.NoError(t, migrator.Up(ctx, test_migrations.All))
names, err = store.allMigrationNames()
// user_version should now be 3.
v, err = store.userVersion()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, v) migrationNamesMatch(t, names, test_migrations.All)
// Backup file should now exist. // Backup file should now exist.
_, err = os.Stat(backupPath) _, err = os.Stat(backupPath)
@ -98,19 +127,19 @@ func TestUpWithBackups(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer backupStore.Close() defer backupStore.Close()
// user_version should be 1 in the backup. // Backup store contains the first migrations records.
v, err = backupStore.userVersion() backupNames, err := backupStore.allMigrationNames()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, v) migrationNamesMatch(t, backupNames, test_migrations.First)
// Run the remaining migrations on the backup. // Run the remaining migrations on the backup.
backupMigrator := NewMigrator(backupStore, logger) backupMigrator := NewMigrator(backupStore, logger)
require.NoError(t, backupMigrator.Up(ctx, test_migrations.Rest)) require.NoError(t, backupMigrator.Up(ctx, test_migrations.All))
// user_version should now be 3 in the backup. // Backup store now contains the rest of the migration records.
v, err = backupStore.userVersion() backupNames, err = backupStore.allMigrationNames()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 3, v) migrationNamesMatch(t, backupNames, test_migrations.All)
} }
func TestScriptVersion(t *testing.T) { func TestScriptVersion(t *testing.T) {
@ -156,3 +185,43 @@ func TestScriptVersion(t *testing.T) {
}) })
} }
} }
func TestDropExtension(t *testing.T) {
tests := []struct {
input string
want string
}{
{
input: "0001_some_migration",
want: "0001_some_migration",
},
{
input: "0001_some_migration.sql",
want: "0001_some_migration",
},
{
input: "0001_some_migration.down.sql",
want: "0001_some_migration",
},
{
input: "0001_some_migration.something.anything.else",
want: "0001_some_migration",
},
}
for _, tt := range tests {
got := dropExtension(tt.input)
require.Equal(t, tt.want, got)
}
}
func migrationNamesMatch(t *testing.T, names []string, files embed.FS) {
t.Helper()
storedMigrations, err := files.ReadDir(".")
require.NoError(t, err)
require.Equal(t, len(storedMigrations), len(names))
for idx := range storedMigrations {
require.Equal(t, dropExtension(storedMigrations[idx].Name()), names[idx])
}
}

View File

@ -8,7 +8,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"sync" "sync"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
@ -20,8 +19,9 @@ import (
) )
const ( const (
DefaultFilename = "influxd.sqlite" DefaultFilename = "influxd.sqlite"
InmemPath = ":memory:" InmemPath = ":memory:"
migrationsTableName = "migrations"
) )
// SqlStore is a wrapper around the db and provides basic functionality for maintaining the db // SqlStore is a wrapper around the db and provides basic functionality for maintaining the db
@ -77,18 +77,19 @@ func (s *SqlStore) Close() error {
return nil return nil
} }
// LockSqlStore locks the database using the mutex. This is intended to lock the database for writes. // RLockSqlStore locks the database using the mutex. This is intended to lock the database for writes.
// It is the responsibilty of implementing service code to manage locks for write operations. // It is the responsibilty of implementing service code to manage locks for write operations.
func (s *SqlStore) RLockSqlStore() { func (s *SqlStore) RLockSqlStore() {
s.Mu.RLock() s.Mu.RLock()
} }
// UnlockSqlStore unlocks the database. // RUnlockSqlStore unlocks the database.
func (s *SqlStore) RUnlockSqlStore() { func (s *SqlStore) RUnlockSqlStore() {
s.Mu.RUnlock() s.Mu.RUnlock()
} }
// Flush deletes all records for all tables in the database. // Flush deletes all records for all tables in the database except for the migration table. This method should only be
// used during end-to-end testing.
func (s *SqlStore) Flush(ctx context.Context) { func (s *SqlStore) Flush(ctx context.Context) {
tables, err := s.tableNames() tables, err := s.tableNames()
if err != nil { if err != nil {
@ -96,6 +97,10 @@ func (s *SqlStore) Flush(ctx context.Context) {
} }
for _, t := range tables { for _, t := range tables {
if t == migrationsTableName {
continue
}
stmt := fmt.Sprintf("DELETE FROM %s", t) stmt := fmt.Sprintf("DELETE FROM %s", t)
err := s.execTrans(ctx, stmt) err := s.execTrans(ctx, stmt)
if err != nil { if err != nil {
@ -316,19 +321,28 @@ func (s *SqlStore) execTrans(ctx context.Context, stmt string) error {
return nil return nil
} }
func (s *SqlStore) userVersion() (int, error) { func (s *SqlStore) allMigrationNames() ([]string, error) {
stmt := `PRAGMA user_version` checkStmt := fmt.Sprintf(`SELECT name FROM sqlite_master WHERE type='table' AND name='%s'`, migrationsTableName)
res, err := s.queryToStrings(stmt) tbls, err := s.queryToStrings(checkStmt)
if err != nil { if err != nil {
return 0, err return nil, err
} }
val, err := strconv.Atoi(res[0]) if len(tbls) == 0 {
if err != nil { return nil, nil
return 0, err
} }
return val, nil migrStmt := fmt.Sprintf(`SELECT name FROM %s ORDER BY name`, migrationsTableName)
migr, err := s.queryToStrings(migrStmt)
if err != nil {
return nil, err
}
if len(migr) == 0 {
return nil, nil
}
return migr, nil
} }
func (s *SqlStore) tableNames() ([]string, error) { func (s *SqlStore) tableNames() ([]string, error) {

View File

@ -35,6 +35,23 @@ func TestFlush(t *testing.T) {
require.Equal(t, 0, len(vals)) require.Equal(t, 0, len(vals))
} }
func TestFlushMigrationsTable(t *testing.T) {
t.Parallel()
ctx := context.Background()
store, clean := NewTestStore(t)
defer clean(t)
require.NoError(t, store.execTrans(ctx, fmt.Sprintf(`CREATE TABLE %s (id TEXT NOT NULL PRIMARY KEY)`, migrationsTableName)))
require.NoError(t, store.execTrans(ctx, fmt.Sprintf(`INSERT INTO %s (id) VALUES ("one"), ("two"), ("three")`, migrationsTableName)))
store.Flush(context.Background())
got, err := store.queryToStrings(fmt.Sprintf(`SELECT * FROM %s`, migrationsTableName))
require.NoError(t, err)
want := []string{"one", "two", "three"}
require.Equal(t, want, got)
}
func TestBackupSqlStore(t *testing.T) { func TestBackupSqlStore(t *testing.T) {
t.Parallel() t.Parallel()
@ -184,21 +201,6 @@ func TestRestoreSqlStore(t *testing.T) {
} }
} }
func TestUserVersion(t *testing.T) {
t.Parallel()
store, clean := NewTestStore(t)
defer clean(t)
ctx := context.Background()
err := store.execTrans(ctx, `PRAGMA user_version=12`)
require.NoError(t, err)
got, err := store.userVersion()
require.NoError(t, err)
require.Equal(t, 12, got)
}
func TestTableNames(t *testing.T) { func TestTableNames(t *testing.T) {
t.Parallel() t.Parallel()
@ -215,3 +217,41 @@ func TestTableNames(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []string{"test_table_1", "test_table_3", "test_table_2"}, got) require.Equal(t, []string{"test_table_1", "test_table_3", "test_table_2"}, got)
} }
func TestAllMigrationNames(t *testing.T) {
t.Parallel()
store, clean := NewTestStore(t)
defer clean(t)
ctx := context.Background()
// Empty db, returns nil slice and no error
got, err := store.allMigrationNames()
require.NoError(t, err)
require.Equal(t, []string(nil), got)
// DB contains migrations table but no migrations
err = store.execTrans(ctx, `CREATE TABLE migrations (
id TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL)`)
require.NoError(t, err)
got, err = store.allMigrationNames()
require.NoError(t, err)
require.Equal(t, []string(nil), got)
// DB contains one migration
err = store.execTrans(ctx, `INSERT INTO migrations (id, name) VALUES ("1", "0000_create_migrations_table.sql")`)
require.NoError(t, err)
got, err = store.allMigrationNames()
require.NoError(t, err)
require.Equal(t, []string{"0000_create_migrations_table.sql"}, got)
// DB contains multiple migrations - they are returned sorted by name
err = store.execTrans(ctx, `INSERT INTO migrations (id, name) VALUES ("3", "0001_first_migration.sql")`)
require.NoError(t, err)
err = store.execTrans(ctx, `INSERT INTO migrations (id, name) VALUES ("2", "0002_second_migration.sql")`)
require.NoError(t, err)
got, err = store.allMigrationNames()
require.NoError(t, err)
require.Equal(t, []string{"0000_create_migrations_table.sql", "0001_first_migration.sql", "0002_second_migration.sql"}, got)
}

View File

@ -0,0 +1,4 @@
CREATE TABLE migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);

View File

@ -1,10 +0,0 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=1;
-- Create the testing table
CREATE TABLE test_table_1 (
id TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -0,0 +1,5 @@
CREATE TABLE test_table_1 (
id TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -1,9 +0,0 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=3;
CREATE TABLE test_table_2 (
user_id TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -1,7 +1,3 @@
-- The user_version should match the "000X" from the file name
-- Ex: 0001_create_notebooks_table should have a user_verison of 1
PRAGMA user_version=2;
ALTER TABLE test_table_1 RENAME TO _test_table_1_old; ALTER TABLE test_table_1 RENAME TO _test_table_1_old;
CREATE TABLE test_table_1 ( CREATE TABLE test_table_1 (

View File

@ -0,0 +1,5 @@
CREATE TABLE test_table_2 (
user_id TEXT NOT NULL PRIMARY KEY,
created_at TIMESTAMP,
updated_at TIMESTAMP
);

View File

@ -5,8 +5,8 @@ import "embed"
//go:embed *.sql //go:embed *.sql
var All embed.FS var All embed.FS
//go:embed 0001_create_test_table_1.sql //go:embed 0001_create_migrations_table.sql
var First embed.FS var MigrationTable embed.FS
//go:embed 0002_rename_test_table_id_1.sql 0003_create_test_table_2.sql //go:embed 0001_create_migrations_table.sql 0002_create_test_table_1.sql
var Rest embed.FS var First embed.FS