diff --git a/Cargo.lock b/Cargo.lock index 25429a9226..063c40f50d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2825,6 +2825,7 @@ dependencies = [ "pretty_assertions", "rand 0.8.5", "serde", + "siphasher", "snafu", "sqlx", "sqlx-hotswap-pool", diff --git a/iox_catalog/Cargo.toml b/iox_catalog/Cargo.toml index 574ad8fdf0..6319e8b478 100644 --- a/iox_catalog/Cargo.toml +++ b/iox_catalog/Cargo.toml @@ -16,6 +16,7 @@ mutable_batch = { path = "../mutable_batch" } observability_deps = { path = "../observability_deps" } parking_lot = { version = "0.12" } serde = { version = "1.0", features = ["derive"] } +siphasher = "0.3" snafu = "0.7" sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid", "sqlite" ] } sqlx-hotswap-pool = { path = "../sqlx-hotswap-pool" } diff --git a/iox_catalog/README.md b/iox_catalog/README.md index 21388cf02a..405dd0632a 100644 --- a/iox_catalog/README.md +++ b/iox_catalog/README.md @@ -73,3 +73,15 @@ basis. As always, there are a few ways to do that: ``` psql 'dbname=iox_shared options=-csearch_path=public,iox_catalog' ``` + +## Failed / Dirty Migrations +Migrations might be marked as dirty in prod if they do not run all the way through. In this case, you have to manually +(using a read-write shell): + +1. Revert the effect of the migration (e.g. drop created tables, drop created indices) +2. Remove the migration from the `_sqlx_migrations`. E.g. if the version of the migration is 1337, this is: + + ```sql + DELETE FROM _sqlx_migrations + WHERE version = 1337; + ``` diff --git a/iox_catalog/migrations/20230524151854_add_parquet_file_partition_and_deleted_index.sql b/iox_catalog/migrations/20230524151854_add_parquet_file_partition_and_deleted_index.sql new file mode 100644 index 0000000000..a7ad59b090 --- /dev/null +++ b/iox_catalog/migrations/20230524151854_add_parquet_file_partition_and_deleted_index.sql @@ -0,0 +1,13 @@ +-- Add to help the querier when it searches for undeleted parquet files. + +-- By default we often only have 5min to finish our statements. The `CREATE INDEX CONCURRENTLY` however takes longer. +-- In our prod test this took about 15min, but better be safe than sorry. +-- IOX_NO_TRANSACTION +SET statement_timeout TO '60min'; + +-- IOX_STEP_BOUNDARY + +-- While `CONCURRENTLY` means it runs parallel to other writes, this command will only finish after the index was +-- successfully built. +-- IOX_NO_TRANSACTION +CREATE INDEX CONCURRENTLY IF NOT EXISTS parquet_file_table_delete_idx ON parquet_file (table_id) WHERE to_delete IS NULL; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index ad04c1963f..b9055692ce 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -253,7 +253,7 @@ pub trait NamespaceRepo: Send + Sync { /// Specify `None` for `retention_period_ns` to get infinite retention. async fn create( &mut self, - name: &NamespaceName, + name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, ) -> Result; diff --git a/iox_catalog/src/kafkaless_transition.rs b/iox_catalog/src/kafkaless_transition.rs index fa89731d3c..4216216e37 100644 --- a/iox_catalog/src/kafkaless_transition.rs +++ b/iox_catalog/src/kafkaless_transition.rs @@ -23,7 +23,7 @@ impl ShardId { } impl std::fmt::Display for ShardId { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } @@ -43,7 +43,7 @@ impl ShardIndex { } impl std::fmt::Display for ShardIndex { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } } diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 498a40464b..f47b2eaf4c 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -1,6 +1,7 @@ //! The IOx catalog keeps track of the namespaces, tables, columns, parquet files, //! and deletes in the system. Configuration information for distributing ingest, query //! and compaction is also stored here. +#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)] #![warn( missing_copy_implementations, missing_debug_implementations, @@ -40,6 +41,7 @@ pub mod interface; pub(crate) mod kafkaless_transition; pub mod mem; pub mod metrics; +pub mod migrate; pub mod postgres; pub mod sqlite; diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index fd8c26d20d..1443362e32 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -142,7 +142,7 @@ impl RepoCollection for MemTxn { impl NamespaceRepo for MemTxn { async fn create( &mut self, - name: &NamespaceName, + name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, ) -> Result { diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 4d996f39b4..6ea7a96b5b 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -132,7 +132,7 @@ macro_rules! decorate { decorate!( impl_trait = NamespaceRepo, methods = [ - "namespace_create" = create(&mut self, name: &NamespaceName, partition_template: Option, retention_period_ns: Option) -> Result; + "namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option) -> Result; "namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option) -> Result; "namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result>; "namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result>; diff --git a/iox_catalog/src/migrate.rs b/iox_catalog/src/migrate.rs new file mode 100644 index 0000000000..a25277e9a9 --- /dev/null +++ b/iox_catalog/src/migrate.rs @@ -0,0 +1,907 @@ +//! Better migrations. +//! +//! # Why +//! SQLx migrations don't work for use, see: +//! +//! - +//! - +//! +//! # Usage +//! Just place your migration in the `migrations` folder. They basically work like normal SQLx migrations but there are +//! a few extra, magic comments you can put in your code to modify the behavior. +//! +//! ## Steps +//! The entire SQL text will be executed as a single statement. However you can split it into multiple steps by using a marker: +//! +//! ```sql +//! CREATE TABLE t1 (x INT); +//! +//! -- IOX_STEP_BOUNDARY +//! +//! CREATE TABLE t2 (x INT); +//! ``` +//! +//! ## Transactions +//! Each step will be executed in its own transaction. However you can opt-out of this: +//! +//! ```sql +//! -- this step is wrapped in a transaction +//! CREATE TABLE t1 (x INT); +//! +//! -- IOX_STEP_BOUNDARY +//! +//! -- this step isn't +//! -- IOX_NO_TRANSACTION +//! CREATE TABLE t2 (x INT); +//! ``` +//! +//! ## Non-SQL steps +//! At the moment, we only support SQL-based migrationsteps but other step types can easily be added. + +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + hash::{Hash, Hasher}, + ops::Deref, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use observability_deps::tracing::info; +use siphasher::sip::SipHasher13; +use sqlx::{ + migrate::{AppliedMigration, Migrate, MigrateError, Migration, MigrationType, Migrator}, + query, query_scalar, Acquire, Connection, Executor, PgConnection, +}; + +/// A single [`IOxMigration`] step. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum IOxMigrationStep { + /// Execute a SQL statement. + /// + /// A SQL statement MAY contain multiple sub-statements, e.g.: + /// + /// ```sql + /// CREATE TABLE IF NOT EXISTS table1 ( + /// id BIGINT GENERATED ALWAYS AS IDENTITY, + /// PRIMARY KEY (id), + /// ); + /// + /// CREATE TABLE IF NOT EXISTS table2 ( + /// id BIGINT GENERATED ALWAYS AS IDENTITY, + /// PRIMARY KEY (id), + /// ); + /// ``` + SqlStatement { + /// The SQL text. + /// + /// If [`in_transaction`](Self::SqlStatement::in_transaction) is set, this MUST NOT contain any transaction modifiers like `COMMIT`/`ROLLBACK`/`BEGIN`! + sql: Cow<'static, str>, + + /// Should the execution of the SQL text be wrapped into a transaction? + /// + /// Whenever possible, you likely want to set this to `true`. However some database changes like `CREATE INDEX + /// CONCURRENTLY` under PostgreSQL cannot be executed within a transaction. + in_transaction: bool, + }, +} + +impl IOxMigrationStep { + /// Apply migration step. + async fn apply(&self, conn: &mut C) -> Result<(), MigrateError> + where + C: IOxMigrate, + { + match self { + Self::SqlStatement { + sql, + in_transaction, + } => { + if *in_transaction { + conn.exec_with_transaction(sql).await?; + } else { + conn.exec_without_transaction(sql).await?; + } + } + } + + Ok(()) + } +} + +/// Database migration. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IOxMigration { + /// Version. + /// + /// This is used to order migrations. + pub version: i64, + + /// Humand-readable description. + pub description: Cow<'static, str>, + + /// Steps that compose this migration. + /// + /// In most cases you want a single [SQL step](IOxMigrationStep::SqlStatement) which is executed + /// [in a transaction](IOxMigrationStep::SqlStatement::in_transaction). + pub steps: Vec, + + /// Checksum of the given steps. + pub checksum: Cow<'static, [u8]>, +} + +impl IOxMigration { + /// Apply migration and return elapsed wall-clock time (measured locally). + async fn apply(&self, conn: &mut C) -> Result + where + C: IOxMigrate, + { + info!( + version = self.version, + description = self.description.as_ref(), + steps = self.steps.len(), + "applying migration" + ); + let start = Instant::now(); + + conn.start_migration(self).await?; + + for (i, step) in self.steps.iter().enumerate() { + info!( + version = self.version, + step = i + 1, + "applying migration step" + ); + step.apply(conn).await?; + } + + let elapsed = start.elapsed(); + conn.finish_migration(self, elapsed).await?; + + info!( + version = self.version, + description = self.description.as_ref(), + steps = self.steps.len(), + elapsed_secs = elapsed.as_secs_f64(), + "migration applied" + ); + + Ok(elapsed) + } +} + +impl From<&Migration> for IOxMigration { + fn from(migration: &Migration) -> Self { + assert!( + migration.migration_type == MigrationType::Simple, + "migration type has to be simple but is {:?}", + migration.migration_type, + ); + + let steps = migration + .sql + .split("-- IOX_STEP_BOUNDARY") + .map(|sql| { + let sql = sql.trim().to_owned(); + let in_transaction = !sql.contains("IOX_NO_TRANSACTION"); + IOxMigrationStep::SqlStatement { + sql: sql.into(), + in_transaction, + } + }) + .collect(); + Self { + version: migration.version, + description: migration.description.clone(), + steps, + // Keep original (unprocessed) checksum for backwards compatibility. + checksum: migration.checksum.clone(), + } + } +} + +/// Migration manager. +#[derive(Debug, PartialEq, Eq)] +pub struct IOxMigrator { + /// List of migrations. + migrations: Vec, +} + +impl IOxMigrator { + /// Create new migrator. + /// + /// # Panics + /// Panics if migrations are not sorted or if there are duplication [versions](IOxMigration::version). + pub fn new(migrations: impl IntoIterator) -> Self { + let migrations = migrations.into_iter().collect::>(); + + if let Some(m) = migrations.windows(2).find(|m| m[0].version > m[1].version) { + panic!( + "migrations are not sorted: version {} is before {} but should not", + m[0].version, m[1].version, + ); + } + if let Some(m) = migrations.windows(2).find(|m| m[0].version == m[1].version) { + panic!( + "migrations are not not unique: version {} found twice", + m[0].version, + ); + } + + Self { migrations } + } + + /// Run migragtor on connection/pool. + /// + /// Returns set of executed [migrations](IOxMigration). + /// + /// This may fail and some migrations may be applied. Also it is possible that a migration itself fails half-way in + /// which case it is marked as dirty. Subsequent migrations will fail until the issue is resolved. + pub async fn run<'a, A>(&self, migrator: A) -> Result, MigrateError> + where + A: Acquire<'a> + Send, + ::Target: IOxMigrate, + { + let mut conn = migrator.acquire().await?; + self.run_direct(&mut *conn).await + } + + /// Run migragtor on open connection. + /// + /// See docs for [run](Self::run). + async fn run_direct(&self, conn: &mut C) -> Result, MigrateError> + where + C: IOxMigrate, + { + let lock_id = conn.generate_lock_id().await?; + ::lock(conn, lock_id).await?; + + // creates [_migrations] table only if needed + // eventually this will likely migrate previous versions of the table + conn.ensure_migrations_table().await?; + + let version = conn.dirty_version().await?; + if let Some(version) = version { + // We currently assume that migrations are NOT idempotent and hence we cannot re-apply them. + return Err(MigrateError::Dirty(version)); + } + + let applied_migrations = conn.list_applied_migrations().await?; + validate_applied_migrations(&applied_migrations, self)?; + + let applied_migrations: HashSet<_> = + applied_migrations.into_iter().map(|m| m.version).collect(); + + let mut new_migrations = HashSet::new(); + for migration in &self.migrations { + if applied_migrations.contains(&migration.version) { + continue; + } + migration.apply(conn).await?; + new_migrations.insert(migration.version); + } + + // unlock the migrator to allow other migrators to run + // but do nothing as we already migrated + ::unlock(conn, lock_id).await?; + + Ok(new_migrations) + } +} + +impl From<&Migrator> for IOxMigrator { + fn from(migrator: &Migrator) -> Self { + assert!( + !migrator.ignore_missing, + "`Migragtor::ignore_missing` MUST NOT be set" + ); + assert!(migrator.locking, "`Migrator::locking` MUST be set"); + + let migrations = migrator + .migrations + .iter() + .map(|migration| migration.into()) + .collect::>(); + + Self::new(migrations) + } +} + +/// Validate an already-applied migration. +/// +/// Checks that: +/// +/// - applied migration is known +/// - checksum of applied migration and known migration match +fn validate_applied_migrations( + applied_migrations: &[AppliedMigration], + migrator: &IOxMigrator, +) -> Result<(), MigrateError> { + let migrations: HashMap<_, _> = migrator.migrations.iter().map(|m| (m.version, m)).collect(); + + for applied_migration in applied_migrations { + match migrations.get(&applied_migration.version) { + None => { + return Err(MigrateError::VersionMissing(applied_migration.version)); + } + Some(migration) => { + if migration.checksum != applied_migration.checksum { + return Err(MigrateError::VersionMismatch(migration.version)); + } + } + } + } + + Ok(()) +} + +/// Interface of a specific database implementation (like Postgres) and the IOx migration system. +/// +/// This mostly delegates to the SQLx [`Migrate`] interface but also has some extra methods. +#[async_trait] +pub trait IOxMigrate: Migrate + Send { + /// Generate a lock ID that is used for [`lock`](Self::lock) and [`unlock`](Self::unlock). + async fn generate_lock_id(&mut self) -> Result; + + /// Lock database for migrations. + async fn lock(&mut self, lock_id: i64) -> Result<(), MigrateError>; + + /// Unlock database after migration. + async fn unlock(&mut self, lock_id: i64) -> Result<(), MigrateError>; + + /// Start a migration and mark it as "not finished". + async fn start_migration(&mut self, migration: &IOxMigration) -> Result<(), MigrateError>; + + /// Finish a migration and register the elapsed time. + async fn finish_migration( + &mut self, + migration: &IOxMigration, + elapsed: Duration, + ) -> Result<(), MigrateError>; + + /// Execute a SQL statement (that may contain multiple sub-statements) within a transaction block. + /// + /// Note that the SQL text can in theory contain `BEGIN`/`COMMIT` commands but shouldn't. + async fn exec_with_transaction(&mut self, sql: &str) -> Result<(), MigrateError>; + + /// Execute a SQL statement (that may contain multiple sub-statements) without a transaction block. + async fn exec_without_transaction(&mut self, sql: &str) -> Result<(), MigrateError>; +} + +#[async_trait] +impl IOxMigrate for PgConnection { + async fn generate_lock_id(&mut self) -> Result { + let db: String = query_scalar("SELECT current_database()") + .fetch_one(self) + .await?; + + // A randomly generated static siphash key to ensure all migrations use the same locks. + // + // Generated with: xxd -i -l 16 /dev/urandom + let key = [ + 0xb8, 0x52, 0x81, 0x3c, 0x12, 0x83, 0x6f, 0xd9, 0x00, 0x4f, 0xe7, 0xe3, 0x61, 0xbd, + 0x03, 0xaf, + ]; + + let mut hasher = SipHasher13::new_with_key(&key); + db.hash(&mut hasher); + + Ok(i64::from_ne_bytes(hasher.finish().to_ne_bytes())) + } + + async fn lock(&mut self, lock_id: i64) -> Result<(), MigrateError> { + loop { + let is_locked: bool = query_scalar("SELECT pg_try_advisory_lock($1)") + .bind(lock_id) + .fetch_one(&mut *self) + .await?; + + if is_locked { + return Ok(()); + } + + tokio::time::sleep(Duration::from_millis(20)).await; + } + } + + async fn unlock(&mut self, lock_id: i64) -> Result<(), MigrateError> { + let _ = query("SELECT pg_advisory_unlock($1)") + .bind(lock_id) + .execute(self) + .await?; + + Ok(()) + } + + async fn start_migration(&mut self, migration: &IOxMigration) -> Result<(), MigrateError> { + let _ = query( + r#" +INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time ) +VALUES ( $1, $2, FALSE, $3, -1 ) + "#, + ) + .bind(migration.version) + .bind(&*migration.description) + .bind(&*migration.checksum) + .execute(self) + .await?; + + Ok(()) + } + + async fn finish_migration( + &mut self, + migration: &IOxMigration, + elapsed: Duration, + ) -> Result<(), MigrateError> { + let _ = query( + r#" +UPDATE _sqlx_migrations +SET success = TRUE, execution_time = $1 +WHERE version = $2 + "#, + ) + .bind(elapsed.as_nanos() as i64) + .bind(migration.version) + .execute(self) + .await?; + + Ok(()) + } + + async fn exec_with_transaction(&mut self, sql: &str) -> Result<(), MigrateError> { + let mut tx = ::begin(self).await?; + let _ = tx.execute(sql).await?; + tx.commit().await?; + Ok(()) + } + + async fn exec_without_transaction(&mut self, sql: &str) -> Result<(), MigrateError> { + let _ = self.execute(sql).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + mod generic { + use super::*; + + #[test] + #[should_panic( + expected = "migrations are not sorted: version 2 is before 1 but should not" + )] + fn test_migrator_new_panic_not_sorted() { + IOxMigrator::new([ + IOxMigration { + version: 2, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }, + IOxMigration { + version: 1, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }, + ]); + } + + #[test] + #[should_panic(expected = "migrations are not not unique: version 2 found twice")] + fn test_migrator_new_panic_not_unique() { + IOxMigrator::new([ + IOxMigration { + version: 2, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }, + IOxMigration { + version: 2, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }, + ]); + } + + #[test] + #[should_panic(expected = "`Migrator::locking` MUST be set")] + fn test_convert_migrator_from_sqlx_panic_no_locking() { + let _ = IOxMigrator::from(&Migrator { + migrations: vec![].into(), + ignore_missing: false, + locking: false, + }); + } + + #[test] + #[should_panic(expected = "`Migragtor::ignore_missing` MUST NOT be set")] + fn test_convert_migrator_from_sqlx_panic_ignore_missing() { + let _ = IOxMigrator::from(&Migrator { + migrations: vec![].into(), + ignore_missing: true, + locking: true, + }); + } + + #[test] + #[should_panic(expected = "migration type has to be simple but is ReversibleUp")] + fn test_convert_migrator_from_sqlx_panic_invalid_migration_type_rev_up() { + let _ = IOxMigrator::from(&Migrator { + migrations: vec![Migration { + version: 1, + description: "".into(), + migration_type: MigrationType::ReversibleUp, + sql: "".into(), + checksum: vec![].into(), + }] + .into(), + ignore_missing: false, + locking: true, + }); + } + + #[test] + #[should_panic(expected = "migration type has to be simple but is ReversibleDown")] + fn test_convert_migrator_from_sqlx_panic_invalid_migration_type_rev_down() { + let _ = IOxMigrator::from(&Migrator { + migrations: vec![Migration { + version: 1, + description: "".into(), + migration_type: MigrationType::ReversibleDown, + sql: "".into(), + checksum: vec![].into(), + }] + .into(), + ignore_missing: false, + locking: true, + }); + } + + #[test] + fn test_convert_migrator_from_sqlx_ok() { + let actual = IOxMigrator::from(&Migrator { + migrations: vec![ + Migration { + version: 1, + description: "some descr".into(), + migration_type: MigrationType::Simple, + sql: "SELECT 1;".into(), + checksum: vec![1, 2, 3].into(), + }, + Migration { + version: 10, + description: "more descr".into(), + migration_type: MigrationType::Simple, + sql: "SELECT 2;\n-- IOX_STEP_BOUNDARY\n-- IOX_NO_TRANSACTION\nSELECT 3;" + .into(), + checksum: vec![4, 5, 6].into(), + }, + ] + .into(), + ignore_missing: false, + locking: true, + }); + + let expected = IOxMigrator { + migrations: vec![ + IOxMigration { + version: 1, + description: "some descr".into(), + steps: vec![IOxMigrationStep::SqlStatement { + sql: "SELECT 1;".into(), + in_transaction: true, + }], + checksum: vec![1, 2, 3].into(), + }, + IOxMigration { + version: 10, + description: "more descr".into(), + steps: vec![ + IOxMigrationStep::SqlStatement { + sql: "SELECT 2;".into(), + in_transaction: true, + }, + IOxMigrationStep::SqlStatement { + sql: "-- IOX_NO_TRANSACTION\nSELECT 3;".into(), + in_transaction: false, + }, + ], + checksum: vec![4, 5, 6].into(), + }, + ], + }; + + assert_eq!(actual, expected); + } + } + + mod postgres { + use std::sync::Arc; + + use futures::{stream::FuturesUnordered, StreamExt}; + use sqlx::{pool::PoolConnection, Postgres}; + use test_helpers::maybe_start_logging; + + use crate::postgres::test_utils::{maybe_skip_integration, setup_db_no_migration}; + + use super::*; + + #[tokio::test] + async fn test_step_sql_statement_no_transaction() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + conn.execute("CREATE TABLE t (x INT);").await.unwrap(); + + let create_index_concurrently = "CREATE INDEX CONCURRENTLY i ON t (x);"; + + // `CREATE INDEX CONCURRENTLY` is NOT possible w/ a transaction. Verify that. + IOxMigrationStep::SqlStatement { + sql: create_index_concurrently.into(), + in_transaction: true, + } + .apply(conn) + .await + .unwrap_err(); + + // ... but it IS possible w/o a transaction. + IOxMigrationStep::SqlStatement { + sql: create_index_concurrently.into(), + in_transaction: false, + } + .apply(conn) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_migrator_happy_path() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::new([ + IOxMigration { + version: 1, + description: "".into(), + steps: vec![ + IOxMigrationStep::SqlStatement { + sql: "CREATE TABLE t (x INT);".into(), + in_transaction: false, + }, + IOxMigrationStep::SqlStatement { + sql: "INSERT INTO t (x) VALUES (1); INSERT INTO t (x) VALUES (10);" + .into(), + in_transaction: true, + }, + ], + checksum: vec![].into(), + }, + IOxMigration { + version: 2, + description: "".into(), + steps: vec![IOxMigrationStep::SqlStatement { + sql: "INSERT INTO t (x) VALUES (100);".into(), + in_transaction: true, + }], + checksum: vec![].into(), + }, + ]); + + let applied = migrator.run_direct(conn).await.unwrap(); + assert_eq!(applied, HashSet::from([1, 2])); + + let r = sqlx::query_as::<_, Res>("SELECT SUM(x)::INT AS r FROM t;") + .fetch_one(conn) + .await + .unwrap() + .r; + + assert_eq!(r, 111); + } + + #[tokio::test] + async fn test_migrator_only_apply_new_migrations() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![IOxMigrationStep::SqlStatement { + // NOT idempotent! + sql: "CREATE TABLE t (x INT);".into(), + in_transaction: false, + }], + checksum: vec![].into(), + }]); + + let applied = migrator.run_direct(conn).await.unwrap(); + assert_eq!(applied, HashSet::from([1])); + + let migrator = + IOxMigrator::new(migrator.migrations.iter().cloned().chain([IOxMigration { + version: 2, + description: "".into(), + steps: vec![IOxMigrationStep::SqlStatement { + // NOT idempotent! + sql: "CREATE TABLE s (x INT);".into(), + in_transaction: false, + }], + checksum: vec![].into(), + }])); + + let applied = migrator.run_direct(conn).await.unwrap(); + assert_eq!(applied, HashSet::from([2])); + + let applied = migrator.run_direct(conn).await.unwrap(); + assert_eq!(applied, HashSet::from([])); + } + + #[tokio::test] + async fn test_migrator_fail_migration_missing() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }]); + + migrator.run_direct(conn).await.unwrap(); + + let migrator = IOxMigrator::new([IOxMigration { + version: 2, + description: "".into(), + steps: vec![], + checksum: vec![].into(), + }]); + + let err = migrator.run_direct(conn).await.unwrap_err(); + assert_eq!( + err.to_string(), + "migration 1 was previously applied but is missing in the resolved migrations" + ); + } + + #[tokio::test] + async fn test_migrator_fail_checksum_mismatch() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![], + checksum: vec![1, 2, 3].into(), + }]); + + migrator.run_direct(conn).await.unwrap(); + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![], + checksum: vec![4, 5, 6].into(), + }]); + + let err = migrator.run_direct(conn).await.unwrap_err(); + assert_eq!( + err.to_string(), + "migration 1 was previously applied but has been modified" + ); + } + + #[tokio::test] + async fn test_migrator_fail_dirty() { + maybe_skip_integration!(); + let mut conn = setup().await; + let conn = &mut *conn; + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![IOxMigrationStep::SqlStatement { + sql: "foo".into(), + in_transaction: false, + }], + checksum: vec![1, 2, 3].into(), + }]); + + migrator.run_direct(conn).await.unwrap_err(); + + let migrator = IOxMigrator::new([IOxMigration { + version: 1, + description: "".into(), + steps: vec![], + // same checksum, but now w/ valid steps (to simulate a once failed SQL statement) + checksum: vec![1, 2, 3].into(), + }]); + + let err = migrator.run_direct(conn).await.unwrap_err(); + assert_eq!( + err.to_string(), + "migration 1 is partially applied; fix and remove row from `_sqlx_migrations` table" + ); + } + + /// Tests that `CREATE INDEX CONCURRENTLY` doesn't deadlock. + /// + /// Originally we used SQLx to acquire the locks which uses `pg_advisory_lock`. However this seems to acquire a + /// global "shared lock". Other migration frameworks faced the same issue and use `pg_try_advisory_lock` + /// instead. Also see: + /// + /// - + /// - + #[tokio::test] + async fn test_locking() { + const N_TABLES_AND_INDICES: usize = 10; + const N_CONCURRENT_MIGRATIONS: usize = 100; + + maybe_skip_integration!(); + maybe_start_logging(); + let pool = setup_db_no_migration().await.into_pool(); + + let migrator = Arc::new(IOxMigrator::new((0..N_TABLES_AND_INDICES).map(|i| { + IOxMigration { + version: i as i64, + description: "".into(), + steps: vec![ + IOxMigrationStep::SqlStatement { + sql: format!("CREATE TABLE t{i} (x INT);").into(), + in_transaction: false, + }, + IOxMigrationStep::SqlStatement { + sql: format!("CREATE INDEX CONCURRENTLY i{i} ON t{i} (x);").into(), + in_transaction: false, + }, + ], + checksum: vec![].into(), + } + }))); + + let mut futures: FuturesUnordered<_> = (0..N_CONCURRENT_MIGRATIONS) + .map(move |_| { + let migrator = Arc::clone(&migrator); + let pool = pool.clone(); + async move { + // pool might timeout, so add another retry loop around it + let mut conn = loop { + let pool = pool.clone(); + if let Ok(conn) = pool.acquire().await { + break conn; + } + }; + let conn = &mut *conn; + migrator.run_direct(conn).await.unwrap(); + } + }) + .collect(); + while futures.next().await.is_some() {} + } + + async fn setup() -> PoolConnection { + maybe_start_logging(); + + let pool = setup_db_no_migration().await.into_pool(); + pool.acquire().await.unwrap() + } + + #[derive(sqlx::FromRow)] + struct Res { + r: i32, + } + } +} diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 7f0a9feeed..7e89021136 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -11,6 +11,7 @@ use crate::{ TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }, metrics::MetricDecorator, + migrate::IOxMigrator, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, }; use async_trait::async_trait; @@ -127,6 +128,11 @@ impl PostgresCatalog { fn schema_name(&self) -> &str { &self.options.schema_name } + + #[cfg(test)] + pub(crate) fn into_pool(self) -> HotSwapPool { + self.pool + } } impl Display for PostgresCatalog { @@ -233,7 +239,8 @@ impl Catalog for PostgresCatalog { .await .map_err(|e| Error::Setup { source: e })?; - MIGRATOR + let migrator = IOxMigrator::from(&MIGRATOR); + migrator .run(&self.pool) .await .map_err(|e| Error::Setup { source: e.into() })?; @@ -498,7 +505,7 @@ impl RepoCollection for PostgresTxn { impl NamespaceRepo for PostgresTxn { async fn create( &mut self, - name: &NamespaceName, + name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, ) -> Result { @@ -1614,36 +1621,32 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { false } +/// Test helpers postgres testing. #[cfg(test)] -mod tests { +pub(crate) mod test_utils { use super::*; - use crate::test_helpers::{arbitrary_namespace, arbitrary_table}; - use assert_matches::assert_matches; - use data_types::{partition_template::TemplatePart, ColumnId, ColumnSet}; - use generated_types::influxdata::iox::partition_template::v1 as proto; - use metric::{Attributes, DurationHistogram, Metric}; use rand::Rng; use sqlx::migrate::MigrateDatabase; - use std::{env, io::Write, sync::Arc, time::Instant}; - use tempfile::NamedTempFile; - // Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment - // variables are not set. + pub const TEST_DSN_ENV: &str = "TEST_INFLUXDB_IOX_CATALOG_DSN"; + + /// Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment + /// variables are not set. macro_rules! maybe_skip_integration { ($panic_msg:expr) => {{ dotenvy::dotenv().ok(); - let required_vars = ["TEST_INFLUXDB_IOX_CATALOG_DSN"]; + let required_vars = [crate::postgres::test_utils::TEST_DSN_ENV]; let unset_vars: Vec<_> = required_vars .iter() - .filter_map(|&name| match env::var(name) { + .filter_map(|&name| match std::env::var(name) { Ok(_) => None, Err(_) => Some(name), }) .collect(); let unset_var_names = unset_vars.join(", "); - let force = env::var("TEST_INTEGRATION"); + let force = std::env::var("TEST_INTEGRATION"); if force.is_ok() && !unset_var_names.is_empty() { panic!( @@ -1674,19 +1677,9 @@ mod tests { }; } - fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { - let histogram = metrics - .get_instrument::>("catalog_op_duration") - .expect("failed to read metric") - .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) - .expect("failed to get observer") - .fetch(); + pub(crate) use maybe_skip_integration; - let hit_count = histogram.sample_count(); - assert!(hit_count > 0, "metric did not record any calls"); - } - - async fn create_db(dsn: &str) { + pub async fn create_db(dsn: &str) { // Create the catalog database if it doesn't exist if !Postgres::database_exists(dsn).await.unwrap() { // Ignore failure if another test has already created the database @@ -1694,7 +1687,7 @@ mod tests { } } - async fn setup_db() -> PostgresCatalog { + pub async fn setup_db_no_migration() -> PostgresCatalog { // create a random schema for this particular pool let schema_name = { // use scope to make it clear to clippy / rust that `rng` is @@ -1741,10 +1734,59 @@ mod tests { .await .expect("failed to grant privileges to schema"); + pg + } + + pub async fn setup_db() -> PostgresCatalog { + let pg = setup_db_no_migration().await; // Run the migrations against this random schema. pg.setup().await.expect("failed to initialise database"); pg } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + postgres::test_utils::{ + create_db, maybe_skip_integration, setup_db, setup_db_no_migration, + }, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; + use assert_matches::assert_matches; + use data_types::{partition_template::TemplatePart, ColumnId, ColumnSet}; + use generated_types::influxdata::iox::partition_template::v1 as proto; + use metric::{Attributes, DurationHistogram, Metric}; + use std::{io::Write, sync::Arc, time::Instant}; + use tempfile::NamedTempFile; + use test_helpers::maybe_start_logging; + + fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { + let histogram = metrics + .get_instrument::>("catalog_op_duration") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) + .expect("failed to get observer") + .fetch(); + + let hit_count = histogram.sample_count(); + assert!(hit_count > 0, "metric did not record any calls"); + } + + #[tokio::test] + async fn test_migration() { + maybe_skip_integration!(); + maybe_start_logging(); + + let postgres = setup_db_no_migration().await; + + // 1st setup + postgres.setup().await.unwrap(); + + // 2nd setup + postgres.setup().await.unwrap(); + } #[tokio::test] async fn test_catalog() { diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index c1d48006bc..b4efcd6119 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -261,7 +261,7 @@ impl RepoCollection for SqliteTxn { impl NamespaceRepo for SqliteTxn { async fn create( &mut self, - name: &NamespaceName, + name: &NamespaceName<'_>, partition_template: Option, retention_period_ns: Option, ) -> Result {