diff --git a/iox_catalog/src/migrate.rs b/iox_catalog/src/migrate.rs index 6e03e0b78d..64c22db285 100644 --- a/iox_catalog/src/migrate.rs +++ b/iox_catalog/src/migrate.rs @@ -42,8 +42,63 @@ //! migrations being stuck. We plan to fix this, see for more //! details. //! +//! ## Updating / Fixing Migrations +//! **⚠️ In general a migration MUST NOT be updated / changed after it was committed to `main`. ⚠️** +//! +//! However there is one exception to this rule: if the new version has the same outcome when applied successfully. This +//! can be due to: +//! +//! - **Optimization:** The migration script turns out to be too slow in production workloads but you find a better +//! version that does the same but runs faster. +//! - **Failure:** The script worked fine during testing but in prod it always fails, e.g. because it is missing NULL +//! handling. It is important to remember that the fix MUST NOT change the outcome of the success runs. +//! - **Idemptoency:** The script works only w/o transactions (see section above) and cannot be re-applied when be +//! interrupted midway. One common case is `CREATE INDEX CONCURRENTLY ...` where you MUST drop the index beforehand +//! via `DROP INDEX IF EXISTS ...` because a previous interrupted migration might have left it in an invalid state. +//! See ["Building Indexes Concurrently"]. +//! +//! If you are very sure that you found a fix for your migration that does the same, you still MUST NOT just change it. +//! The reason is that we keep a checksum of the migration stored in the database and changing the script will change +//! the checksum will lead to a [failure](MigrateError::VersionMismatch) when running the migrations. You can work +//! around that by obtaining the old checksum (in hex) and add it to the new version as: `-- IOX_OTHER_CHECKSUM: +//! 42feedbull`. This pragma can be repeated multiple times. +//! +//! ### Example +//! The old script looks like this: +//! +//! ```sql +//! -- IOX_NO_TRANSACTION +//! SET statement_timeout TO '60min'; +//! +//! -- IOX_STEP_BOUNDARY +//! +//! -- IOX_NO_TRANSACTION +//! CREATE INDEX CONCURRENTLY IF NOT EXISTS i ON t (x); +//! ``` +//! +//! You can fix the idemptoency by updating it to: +//! +//! ```sql +//! -- IOX_OTHER_CHECKSUM: 067431eaa74f26ee86200aaed4992a5fe22354322102f1ed795e424ec529469079569072d856e96ee9fdb6cc848b6137 +//! -- IOX_NO_TRANSACTION +//! SET statement_timeout TO '60min'; +//! +//! -- IOX_STEP_BOUNDARY +//! DROP INDEX CONCURRENTLY IF EXISTS i; +//! +//! -- IOX_NO_TRANSACTION +//! +//! -- IOX_STEP_BOUNDARY +//! +//! -- IOX_NO_TRANSACTION +//! CREATE INDEX CONCURRENTLY IF NOT EXISTS i ON t (x); +//! ``` +//! //! ## Non-SQL steps //! At the moment, we only support SQL-based migrationsteps but other step types can easily be added. +//! +//! +//! ["Building Indexes Concurrently"]: https://www.postgresql.org/docs/15/sql-createindex.html#SQL-CREATEINDEX-CONCURRENTLY use std::{ borrow::Cow, @@ -189,6 +244,13 @@ pub struct IOxMigration { /// Checksum of the given steps. pub checksum: Checksum, + + /// Checksums of other versions of this migration that are known to be compatible. + /// + /// **Using this should be a rare exception!** + /// + /// This can be used to convert an non-idempotent migration into an idempotent one. + pub other_compatible_checksums: Box<[Checksum]>, } impl IOxMigration { @@ -289,6 +351,15 @@ impl TryFrom<&Migration> for IOxMigration { )); } + let other_compatible_checksums = migration + .sql + .lines() + .filter_map(|s| { + s.strip_prefix("-- IOX_OTHER_CHECKSUM:") + .map(|s| s.trim().parse()) + }) + .collect::>()?; + let steps = migration .sql .split("-- IOX_STEP_BOUNDARY") @@ -308,6 +379,7 @@ impl TryFrom<&Migration> for IOxMigration { steps, // Keep original (unprocessed) checksum for backwards compatibility. checksum: migration.checksum.deref().into(), + other_compatible_checksums, }) } } @@ -454,7 +526,10 @@ fn validate_applied_migrations( return Err(MigrateError::VersionMissing(applied_migration.version)); } Some(migration) => { - if migration.checksum.as_bytes() != applied_migration.checksum.deref() { + if !std::iter::once(&migration.checksum) + .chain(migration.other_compatible_checksums.iter()) + .any(|cs| cs.as_bytes() == applied_migration.checksum.deref()) + { return Err(MigrateError::VersionMismatch(migration.version)); } } @@ -676,12 +751,14 @@ mod tests { description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, IOxMigration { version: 1, description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, ]) .unwrap_err(); @@ -700,12 +777,14 @@ mod tests { description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, IOxMigration { version: 2, description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, ]) .unwrap_err(); @@ -789,6 +868,28 @@ mod tests { ); } + #[test] + fn test_convert_migrator_from_sqlx_error_invalid_other_compatible_checksum() { + let err = IOxMigrator::try_from(&Migrator { + migrations: vec![Migration { + version: 1, + description: "".into(), + migration_type: MigrationType::Simple, + sql: "-- IOX_OTHER_CHECKSUM: foo".into(), + checksum: vec![].into(), + }] + .into(), + ignore_missing: false, + locking: true, + }) + .unwrap_err(); + + assert_eq!( + err.to_string(), + "while resolving migrations: cannot parse checksum 'foo': invalid digit found in string", + ); + } + #[test] fn test_convert_migrator_from_sqlx_ok() { let actual = IOxMigrator::try_from(&Migrator { @@ -808,6 +909,14 @@ mod tests { .into(), checksum: vec![4, 5, 6].into(), }, + Migration { + version: 11, + description: "xxx".into(), + migration_type: MigrationType::Simple, + sql: "-- IOX_OTHER_CHECKSUM:1ff\n-- IOX_OTHER_CHECKSUM: 2ff \nSELECT4;" + .into(), + checksum: vec![7, 8, 9].into(), + }, ] .into(), ignore_missing: false, @@ -826,6 +935,7 @@ mod tests { }] .into(), checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }, IOxMigration { version: 10, @@ -842,6 +952,21 @@ mod tests { ] .into(), checksum: [4, 5, 6].into(), + other_compatible_checksums: [].into(), + }, + IOxMigration { + version: 11, + description: "xxx".into(), + steps: [IOxMigrationStep::SqlStatement { + sql: "-- IOX_OTHER_CHECKSUM:1ff\n-- IOX_OTHER_CHECKSUM: 2ff \nSELECT4;".into(), + in_transaction: true, + }] + .into(), + checksum: [7, 8, 9].into(), + other_compatible_checksums: [ + Checksum::from_str("1ff").unwrap(), + Checksum::from_str("2ff").unwrap(), + ].into(), }, ], }; @@ -882,6 +1007,7 @@ mod tests { }] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); let res = migrator.run_direct(conn).await; @@ -924,6 +1050,7 @@ mod tests { ] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, IOxMigration { version: 2, @@ -934,6 +1061,7 @@ mod tests { }] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), }, ]) .unwrap(); @@ -966,6 +1094,7 @@ mod tests { }] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -983,6 +1112,7 @@ mod tests { }] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), }]), ) .unwrap(); @@ -1005,6 +1135,7 @@ mod tests { description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1015,6 +1146,7 @@ mod tests { description: "".into(), steps: [].into(), checksum: [].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1036,6 +1168,7 @@ mod tests { description: "".into(), steps: [].into(), checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1046,6 +1179,7 @@ mod tests { description: "".into(), steps: [].into(), checksum: [4, 5, 6].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1056,6 +1190,35 @@ mod tests { ); } + #[tokio::test] + async fn test_migrator_other_compatible_checksum() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::try_new([IOxMigration { + version: 1, + description: "".into(), + steps: [].into(), + checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), + }]) + .unwrap(); + + migrator.run_direct(conn).await.unwrap(); + + let migrator = IOxMigrator::try_new([IOxMigration { + version: 1, + description: "".into(), + steps: [].into(), + checksum: [4, 5, 6].into(), + other_compatible_checksums: [[1, 2, 3].into()].into(), + }]) + .unwrap(); + + migrator.run_direct(conn).await.unwrap(); + } + #[tokio::test] async fn test_migrator_fail_dirty() { maybe_skip_integration!(); @@ -1073,6 +1236,7 @@ mod tests { }] .into(), checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1084,6 +1248,7 @@ mod tests { steps: [].into(), // same checksum, but now w/ valid steps (to simulate a once failed SQL statement) checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1132,6 +1297,7 @@ mod tests { steps: steps_broken.into(), // use a placeholder checksum (normally this would be calculated based on the steps) checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); migrator.run_direct(conn).await.unwrap_err(); @@ -1150,6 +1316,7 @@ mod tests { steps: steps_ok.into(), // same checksum, but now w/ valid steps (to simulate a once failed SQL statement) checksum: [1, 2, 3].into(), + other_compatible_checksums: [].into(), }]) .unwrap(); @@ -1199,6 +1366,7 @@ mod tests { ] .into(), checksum: [].into(), + other_compatible_checksums: [].into(), } })) .unwrap(),