refactor: migration checksum type (#8388)
* refactor: use `Box<[...]>` instead of `Vec<...>` We are not planning to modify the vector, so storing a capacity and a length is somewhat pointless. * feat: add printout test for PG migrations * refactor: use dedicated checksum type * feat: checksum string roundtrips --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
76c766330a
commit
9e4e205ffd
|
@ -2788,9 +2788,11 @@ dependencies = [
|
|||
"mutable_batch",
|
||||
"mutable_batch_lp",
|
||||
"observability_deps",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"paste",
|
||||
"pretty_assertions",
|
||||
"proptest",
|
||||
"rand",
|
||||
"serde",
|
||||
"siphasher",
|
||||
|
|
|
@ -14,6 +14,7 @@ log = "0.4"
|
|||
metric = { version = "0.1.0", path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.18", features = ["parking_lot"] }
|
||||
parking_lot = { version = "0.12" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
siphasher = "0.3"
|
||||
|
@ -32,6 +33,7 @@ generated_types = { path = "../generated_types" }
|
|||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
paste = "1.0.14"
|
||||
pretty_assertions = "1.4.0"
|
||||
proptest = { version = "1", default_features = false, features = ["std"] }
|
||||
rand = "0.8"
|
||||
tempfile = "3"
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
|
|
@ -50,6 +50,7 @@ use std::{
|
|||
collections::{HashMap, HashSet},
|
||||
hash::{Hash, Hasher},
|
||||
ops::Deref,
|
||||
str::FromStr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
|
@ -116,6 +117,59 @@ impl IOxMigrationStep {
|
|||
}
|
||||
}
|
||||
|
||||
/// Migration checksum.
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub struct Checksum(Box<[u8]>);
|
||||
|
||||
impl Checksum {
|
||||
fn as_bytes(&self) -> &[u8] {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Checksum {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
for b in self.0.iter() {
|
||||
write!(f, "{:02x}", b)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Checksum {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize> From<[u8; N]> for Checksum {
|
||||
fn from(value: [u8; N]) -> Self {
|
||||
Self(value.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&[u8]> for Checksum {
|
||||
fn from(value: &[u8]) -> Self {
|
||||
Self(value.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Checksum {
|
||||
type Err = MigrateError;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let inner = (0..s.len())
|
||||
.step_by(2)
|
||||
.map(|i| u8::from_str_radix(&s[i..(i + 2).min(s.len())], 16))
|
||||
.collect::<Result<Box<[u8]>, _>>()
|
||||
.map_err(|e| {
|
||||
MigrateError::Source(format!("cannot parse checksum '{s}': {e}").into())
|
||||
})?;
|
||||
|
||||
Ok(Self(inner))
|
||||
}
|
||||
}
|
||||
|
||||
/// Database migration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct IOxMigration {
|
||||
|
@ -131,10 +185,10 @@ pub struct IOxMigration {
|
|||
///
|
||||
/// In most cases you want a single [SQL step](IOxMigrationStep::SqlStatement) which is executed
|
||||
/// [in a transaction](IOxMigrationStep::SqlStatement::in_transaction).
|
||||
pub steps: Vec<IOxMigrationStep>,
|
||||
pub steps: Box<[IOxMigrationStep]>,
|
||||
|
||||
/// Checksum of the given steps.
|
||||
pub checksum: Cow<'static, [u8]>,
|
||||
pub checksum: Checksum,
|
||||
}
|
||||
|
||||
impl IOxMigration {
|
||||
|
@ -246,7 +300,7 @@ impl From<&Migration> for IOxMigration {
|
|||
description: migration.description.clone(),
|
||||
steps,
|
||||
// Keep original (unprocessed) checksum for backwards compatibility.
|
||||
checksum: migration.checksum.clone(),
|
||||
checksum: migration.checksum.deref().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,7 +430,7 @@ fn validate_applied_migrations(
|
|||
return Err(MigrateError::VersionMissing(applied_migration.version));
|
||||
}
|
||||
Some(migration) => {
|
||||
if migration.checksum != applied_migration.checksum {
|
||||
if migration.checksum.as_bytes() != applied_migration.checksum.deref() {
|
||||
return Err(MigrateError::VersionMismatch(migration.version));
|
||||
}
|
||||
}
|
||||
|
@ -514,7 +568,7 @@ VALUES ( $1, $2, FALSE, $3, -1 )
|
|||
)
|
||||
.bind(migration.version)
|
||||
.bind(&*migration.description)
|
||||
.bind(&*migration.checksum)
|
||||
.bind(migration.checksum.as_bytes())
|
||||
.execute(self)
|
||||
.await?;
|
||||
|
||||
|
@ -554,6 +608,42 @@ mod tests {
|
|||
mod generic {
|
||||
use super::*;
|
||||
|
||||
use proptest::prelude::*;
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn test_checksum_string_roundtrip(s: Vec<u8>) {
|
||||
let checksum_1 = Checksum::from(s.as_slice());
|
||||
let string_1 = checksum_1.to_string();
|
||||
let checksum_2 = Checksum::from_str(&string_1).unwrap();
|
||||
let string_2 = checksum_2.to_string();
|
||||
assert_eq!(checksum_1, checksum_2);
|
||||
assert_eq!(string_1, string_2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_valid_checksum() {
|
||||
let actual = Checksum::from_str("b88c635e27f8b9ba8547b24efcb081429a8f3e85b70f35916e1900dffc4e6a77eed8a02acc7c72526dd7d50166b63fbd").unwrap();
|
||||
let expected = Checksum::from([
|
||||
184, 140, 99, 94, 39, 248, 185, 186, 133, 71, 178, 78, 252, 176, 129, 66, 154, 143,
|
||||
62, 133, 183, 15, 53, 145, 110, 25, 0, 223, 252, 78, 106, 119, 238, 216, 160, 42,
|
||||
204, 124, 114, 82, 109, 215, 213, 1, 102, 182, 63, 189,
|
||||
]);
|
||||
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_invalid_checksum() {
|
||||
let err = Checksum::from_str("foo").unwrap_err();
|
||||
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"while resolving migrations: cannot parse checksum 'foo': invalid digit found in string",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(
|
||||
expected = "migrations are not sorted: version 2 is before 1 but should not"
|
||||
|
@ -563,14 +653,14 @@ mod tests {
|
|||
IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
@ -582,14 +672,14 @@ mod tests {
|
|||
IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
@ -678,16 +768,17 @@ mod tests {
|
|||
IOxMigration {
|
||||
version: 1,
|
||||
description: "some descr".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
sql: "SELECT 1;".into(),
|
||||
in_transaction: true,
|
||||
}],
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
}]
|
||||
.into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
},
|
||||
IOxMigration {
|
||||
version: 10,
|
||||
description: "more descr".into(),
|
||||
steps: vec![
|
||||
steps: [
|
||||
IOxMigrationStep::SqlStatement {
|
||||
sql: "SELECT 2;".into(),
|
||||
in_transaction: true,
|
||||
|
@ -696,8 +787,9 @@ mod tests {
|
|||
sql: "-- IOX_NO_TRANSACTION\nSELECT 3;".into(),
|
||||
in_transaction: false,
|
||||
},
|
||||
],
|
||||
checksum: vec![4, 5, 6].into(),
|
||||
]
|
||||
.into(),
|
||||
checksum: [4, 5, 6].into(),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
@ -732,11 +824,12 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
sql: "CREATE INDEX CONCURRENTLY i ON t (x);".into(),
|
||||
in_transaction,
|
||||
}],
|
||||
checksum: vec![].into(),
|
||||
}]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
}]);
|
||||
let res = migrator.run_direct(conn).await;
|
||||
|
||||
|
@ -765,7 +858,7 @@ mod tests {
|
|||
IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![
|
||||
steps: [
|
||||
IOxMigrationStep::SqlStatement {
|
||||
sql: "CREATE TABLE t (x INT);".into(),
|
||||
in_transaction: false,
|
||||
|
@ -775,17 +868,19 @@ mod tests {
|
|||
.into(),
|
||||
in_transaction: true,
|
||||
},
|
||||
],
|
||||
checksum: vec![].into(),
|
||||
]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
sql: "INSERT INTO t (x) VALUES (100);".into(),
|
||||
in_transaction: true,
|
||||
}],
|
||||
checksum: vec![].into(),
|
||||
}]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
},
|
||||
]);
|
||||
|
||||
|
@ -810,28 +905,31 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
// NOT idempotent!
|
||||
sql: "CREATE TABLE t (x INT);".into(),
|
||||
in_transaction: false,
|
||||
}],
|
||||
checksum: vec![].into(),
|
||||
}]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
}]);
|
||||
|
||||
let applied = migrator.run_direct(conn).await.unwrap();
|
||||
assert_eq!(applied, HashSet::from([1]));
|
||||
|
||||
let migrator =
|
||||
IOxMigrator::new(migrator.migrations.iter().cloned().chain([IOxMigration {
|
||||
let migrator = IOxMigrator::new(
|
||||
migrator.migrations.iter().cloned().chain([IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
// NOT idempotent!
|
||||
sql: "CREATE TABLE s (x INT);".into(),
|
||||
in_transaction: false,
|
||||
}],
|
||||
checksum: vec![].into(),
|
||||
}]));
|
||||
}]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
}]),
|
||||
);
|
||||
|
||||
let applied = migrator.run_direct(conn).await.unwrap();
|
||||
assert_eq!(applied, HashSet::from([2]));
|
||||
|
@ -849,8 +947,8 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
}]);
|
||||
|
||||
migrator.run_direct(conn).await.unwrap();
|
||||
|
@ -858,8 +956,8 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 2,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![].into(),
|
||||
steps: [].into(),
|
||||
checksum: [].into(),
|
||||
}]);
|
||||
|
||||
let err = migrator.run_direct(conn).await.unwrap_err();
|
||||
|
@ -878,8 +976,8 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
steps: [].into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
}]);
|
||||
|
||||
migrator.run_direct(conn).await.unwrap();
|
||||
|
@ -887,8 +985,8 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
checksum: vec![4, 5, 6].into(),
|
||||
steps: [].into(),
|
||||
checksum: [4, 5, 6].into(),
|
||||
}]);
|
||||
|
||||
let err = migrator.run_direct(conn).await.unwrap_err();
|
||||
|
@ -907,13 +1005,14 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![IOxMigrationStep::SqlStatement {
|
||||
steps: [IOxMigrationStep::SqlStatement {
|
||||
sql: "foo".into(),
|
||||
// set to NO transaction, otherwise the migrator will happily wrap the migration bookkeeping and the
|
||||
// migration script itself into a single transaction to avoid the "dirty" state
|
||||
in_transaction: false,
|
||||
}],
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
}]
|
||||
.into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
}]);
|
||||
|
||||
migrator.run_direct(conn).await.unwrap_err();
|
||||
|
@ -921,9 +1020,9 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: vec![],
|
||||
steps: [].into(),
|
||||
// same checksum, but now w/ valid steps (to simulate a once failed SQL statement)
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
}]);
|
||||
|
||||
let err = migrator.run_direct(conn).await.unwrap_err();
|
||||
|
@ -968,9 +1067,9 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: steps_broken,
|
||||
steps: steps_broken.into(),
|
||||
// use a placeholder checksum (normally this would be calculated based on the steps)
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
}]);
|
||||
migrator.run_direct(conn).await.unwrap_err();
|
||||
|
||||
|
@ -985,9 +1084,9 @@ mod tests {
|
|||
let migrator = IOxMigrator::new([IOxMigration {
|
||||
version: 1,
|
||||
description: "".into(),
|
||||
steps: steps_ok,
|
||||
steps: steps_ok.into(),
|
||||
// same checksum, but now w/ valid steps (to simulate a once failed SQL statement)
|
||||
checksum: vec![1, 2, 3].into(),
|
||||
checksum: [1, 2, 3].into(),
|
||||
}]);
|
||||
|
||||
let applied = migrator.run_direct(conn).await.unwrap();
|
||||
|
@ -1023,7 +1122,7 @@ mod tests {
|
|||
IOxMigration {
|
||||
version: i as i64,
|
||||
description: "".into(),
|
||||
steps: vec![
|
||||
steps: [
|
||||
IOxMigrationStep::SqlStatement {
|
||||
sql: format!("CREATE TABLE t{i} (x INT);").into(),
|
||||
in_transaction: false,
|
||||
|
@ -1032,8 +1131,9 @@ mod tests {
|
|||
sql: format!("CREATE INDEX CONCURRENTLY i{i} ON t{i} (x);").into(),
|
||||
in_transaction: false,
|
||||
},
|
||||
],
|
||||
checksum: vec![].into(),
|
||||
]
|
||||
.into(),
|
||||
checksum: [].into(),
|
||||
}
|
||||
})));
|
||||
|
||||
|
|
|
@ -28,10 +28,10 @@ use data_types::{
|
|||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{Attributes, Instrument, MetricKind};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
use snafu::prelude::*;
|
||||
use sqlx::{
|
||||
migrate::Migrator,
|
||||
postgres::{PgConnectOptions, PgPoolOptions},
|
||||
types::Uuid,
|
||||
Acquire, ConnectOptions, Executor, Postgres, Row,
|
||||
|
@ -42,7 +42,7 @@ use std::collections::HashSet;
|
|||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::{collections::HashMap, fmt::Display, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
static MIGRATOR: Migrator = sqlx::migrate!();
|
||||
static MIGRATOR: Lazy<IOxMigrator> = Lazy::new(|| IOxMigrator::from(&sqlx::migrate!()));
|
||||
|
||||
/// Postgres connection options.
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -247,8 +247,7 @@ impl Catalog for PostgresCatalog {
|
|||
.await
|
||||
.map_err(|e| Error::Setup { source: e })?;
|
||||
|
||||
let migrator = IOxMigrator::from(&MIGRATOR);
|
||||
migrator
|
||||
MIGRATOR
|
||||
.run(&self.pool)
|
||||
.await
|
||||
.map_err(|e| Error::Setup { source: e.into() })?;
|
||||
|
@ -2029,7 +2028,7 @@ mod tests {
|
|||
use data_types::partition_template::TemplatePart;
|
||||
use generated_types::influxdata::iox::partition_template::v1 as proto;
|
||||
use metric::{Attributes, DurationHistogram, Metric, Observation, RawReporter};
|
||||
use std::{io::Write, sync::Arc, time::Instant};
|
||||
use std::{io::Write, ops::Deref, sync::Arc, time::Instant};
|
||||
use tempfile::NamedTempFile;
|
||||
use test_helpers::maybe_start_logging;
|
||||
|
||||
|
@ -2045,6 +2044,14 @@ mod tests {
|
|||
assert!(hit_count > 0, "metric did not record any calls");
|
||||
}
|
||||
|
||||
/// Small no-op test just to print out the migrations.
|
||||
///
|
||||
/// This is helpful to look up migration checksums and debug parsing of the migration files.
|
||||
#[test]
|
||||
fn print_migrations() {
|
||||
println!("{:#?}", MIGRATOR.deref());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_migration() {
|
||||
maybe_skip_integration!();
|
||||
|
|
Loading…
Reference in New Issue