feat: allow fixing migrations (#8394)

We need to fix our non-idempotent `CREATE INDEX CONCURRENTLY` migrations
so they are self-healing. This is an essential part of #7897.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-08-04 10:05:45 +02:00 committed by GitHub
parent b7c0bcb61f
commit 0c9570889b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 169 additions and 1 deletions

View File

@ -42,8 +42,63 @@
//! migrations being stuck. We plan to fix this, see <https://github.com/influxdata/influxdb_iox/issues/7897> for more
//! details.
//!
//! ## Updating / Fixing Migrations
//! **⚠️ In general a migration MUST NOT be updated / changed after it was committed to `main`. ⚠️**
//!
//! However there is one exception to this rule: if the new version has the same outcome when applied successfully. This
//! can be due to:
//!
//! - **Optimization:** The migration script turns out to be too slow in production workloads but you find a better
//! version that does the same but runs faster.
//! - **Failure:** The script worked fine during testing but in prod it always fails, e.g. because it is missing NULL
//! handling. It is important to remember that the fix MUST NOT change the outcome of the success runs.
//! - **Idemptoency:** The script works only w/o transactions (see section above) and cannot be re-applied when be
//! interrupted midway. One common case is `CREATE INDEX CONCURRENTLY ...` where you MUST drop the index beforehand
//! via `DROP INDEX IF EXISTS ...` because a previous interrupted migration might have left it in an invalid state.
//! See ["Building Indexes Concurrently"].
//!
//! If you are very sure that you found a fix for your migration that does the same, you still MUST NOT just change it.
//! The reason is that we keep a checksum of the migration stored in the database and changing the script will change
//! the checksum will lead to a [failure](MigrateError::VersionMismatch) when running the migrations. You can work
//! around that by obtaining the old checksum (in hex) and add it to the new version as: `-- IOX_OTHER_CHECKSUM:
//! 42feedbull`. This pragma can be repeated multiple times.
//!
//! ### Example
//! The old script looks like this:
//!
//! ```sql
//! -- IOX_NO_TRANSACTION
//! SET statement_timeout TO '60min';
//!
//! -- IOX_STEP_BOUNDARY
//!
//! -- IOX_NO_TRANSACTION
//! CREATE INDEX CONCURRENTLY IF NOT EXISTS i ON t (x);
//! ```
//!
//! You can fix the idemptoency by updating it to:
//!
//! ```sql
//! -- IOX_OTHER_CHECKSUM: 067431eaa74f26ee86200aaed4992a5fe22354322102f1ed795e424ec529469079569072d856e96ee9fdb6cc848b6137
//! -- IOX_NO_TRANSACTION
//! SET statement_timeout TO '60min';
//!
//! -- IOX_STEP_BOUNDARY
//! DROP INDEX CONCURRENTLY IF EXISTS i;
//!
//! -- IOX_NO_TRANSACTION
//!
//! -- IOX_STEP_BOUNDARY
//!
//! -- IOX_NO_TRANSACTION
//! CREATE INDEX CONCURRENTLY IF NOT EXISTS i ON t (x);
//! ```
//!
//! ## Non-SQL steps
//! At the moment, we only support SQL-based migrationsteps but other step types can easily be added.
//!
//!
//! ["Building Indexes Concurrently"]: https://www.postgresql.org/docs/15/sql-createindex.html#SQL-CREATEINDEX-CONCURRENTLY
use std::{
borrow::Cow,
@ -189,6 +244,13 @@ pub struct IOxMigration {
/// Checksum of the given steps.
pub checksum: Checksum,
/// Checksums of other versions of this migration that are known to be compatible.
///
/// **Using this should be a rare exception!**
///
/// This can be used to convert an non-idempotent migration into an idempotent one.
pub other_compatible_checksums: Box<[Checksum]>,
}
impl IOxMigration {
@ -289,6 +351,15 @@ impl TryFrom<&Migration> for IOxMigration {
));
}
let other_compatible_checksums = migration
.sql
.lines()
.filter_map(|s| {
s.strip_prefix("-- IOX_OTHER_CHECKSUM:")
.map(|s| s.trim().parse())
})
.collect::<Result<_, _>>()?;
let steps = migration
.sql
.split("-- IOX_STEP_BOUNDARY")
@ -308,6 +379,7 @@ impl TryFrom<&Migration> for IOxMigration {
steps,
// Keep original (unprocessed) checksum for backwards compatibility.
checksum: migration.checksum.deref().into(),
other_compatible_checksums,
})
}
}
@ -454,7 +526,10 @@ fn validate_applied_migrations(
return Err(MigrateError::VersionMissing(applied_migration.version));
}
Some(migration) => {
if migration.checksum.as_bytes() != applied_migration.checksum.deref() {
if !std::iter::once(&migration.checksum)
.chain(migration.other_compatible_checksums.iter())
.any(|cs| cs.as_bytes() == applied_migration.checksum.deref())
{
return Err(MigrateError::VersionMismatch(migration.version));
}
}
@ -676,12 +751,14 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
IOxMigration {
version: 1,
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
])
.unwrap_err();
@ -700,12 +777,14 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
IOxMigration {
version: 2,
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
])
.unwrap_err();
@ -789,6 +868,28 @@ mod tests {
);
}
#[test]
fn test_convert_migrator_from_sqlx_error_invalid_other_compatible_checksum() {
let err = IOxMigrator::try_from(&Migrator {
migrations: vec![Migration {
version: 1,
description: "".into(),
migration_type: MigrationType::Simple,
sql: "-- IOX_OTHER_CHECKSUM: foo".into(),
checksum: vec![].into(),
}]
.into(),
ignore_missing: false,
locking: true,
})
.unwrap_err();
assert_eq!(
err.to_string(),
"while resolving migrations: cannot parse checksum 'foo': invalid digit found in string",
);
}
#[test]
fn test_convert_migrator_from_sqlx_ok() {
let actual = IOxMigrator::try_from(&Migrator {
@ -808,6 +909,14 @@ mod tests {
.into(),
checksum: vec![4, 5, 6].into(),
},
Migration {
version: 11,
description: "xxx".into(),
migration_type: MigrationType::Simple,
sql: "-- IOX_OTHER_CHECKSUM:1ff\n-- IOX_OTHER_CHECKSUM: 2ff \nSELECT4;"
.into(),
checksum: vec![7, 8, 9].into(),
},
]
.into(),
ignore_missing: false,
@ -826,6 +935,7 @@ mod tests {
}]
.into(),
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
},
IOxMigration {
version: 10,
@ -842,6 +952,21 @@ mod tests {
]
.into(),
checksum: [4, 5, 6].into(),
other_compatible_checksums: [].into(),
},
IOxMigration {
version: 11,
description: "xxx".into(),
steps: [IOxMigrationStep::SqlStatement {
sql: "-- IOX_OTHER_CHECKSUM:1ff\n-- IOX_OTHER_CHECKSUM: 2ff \nSELECT4;".into(),
in_transaction: true,
}]
.into(),
checksum: [7, 8, 9].into(),
other_compatible_checksums: [
Checksum::from_str("1ff").unwrap(),
Checksum::from_str("2ff").unwrap(),
].into(),
},
],
};
@ -882,6 +1007,7 @@ mod tests {
}]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
let res = migrator.run_direct(conn).await;
@ -924,6 +1050,7 @@ mod tests {
]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
IOxMigration {
version: 2,
@ -934,6 +1061,7 @@ mod tests {
}]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
},
])
.unwrap();
@ -966,6 +1094,7 @@ mod tests {
}]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -983,6 +1112,7 @@ mod tests {
}]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}]),
)
.unwrap();
@ -1005,6 +1135,7 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1015,6 +1146,7 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1036,6 +1168,7 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1046,6 +1179,7 @@ mod tests {
description: "".into(),
steps: [].into(),
checksum: [4, 5, 6].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1056,6 +1190,35 @@ mod tests {
);
}
#[tokio::test]
async fn test_migrator_other_compatible_checksum() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::try_new([IOxMigration {
version: 1,
description: "".into(),
steps: [].into(),
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
migrator.run_direct(conn).await.unwrap();
let migrator = IOxMigrator::try_new([IOxMigration {
version: 1,
description: "".into(),
steps: [].into(),
checksum: [4, 5, 6].into(),
other_compatible_checksums: [[1, 2, 3].into()].into(),
}])
.unwrap();
migrator.run_direct(conn).await.unwrap();
}
#[tokio::test]
async fn test_migrator_fail_dirty() {
maybe_skip_integration!();
@ -1073,6 +1236,7 @@ mod tests {
}]
.into(),
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1084,6 +1248,7 @@ mod tests {
steps: [].into(),
// same checksum, but now w/ valid steps (to simulate a once failed SQL statement)
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1132,6 +1297,7 @@ mod tests {
steps: steps_broken.into(),
// use a placeholder checksum (normally this would be calculated based on the steps)
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
migrator.run_direct(conn).await.unwrap_err();
@ -1150,6 +1316,7 @@ mod tests {
steps: steps_ok.into(),
// same checksum, but now w/ valid steps (to simulate a once failed SQL statement)
checksum: [1, 2, 3].into(),
other_compatible_checksums: [].into(),
}])
.unwrap();
@ -1199,6 +1366,7 @@ mod tests {
]
.into(),
checksum: [].into(),
other_compatible_checksums: [].into(),
}
}))
.unwrap(),