refactor: add `parquet_file` PG index for querier (#7842)

* refactor: add `parquet_file` PG index for querier

Currently the `list_by_table_not_to_delete` catalog query is somewhat
expensive:

```text
iox_catalog_prod=> select table_id, sum((to_delete is NULL)::int) as n from parquet_file group by table_id order by n desc limit 5;
 table_id |  n
----------+------
  1489038 | 7221
  1489037 | 7019
  1491534 | 5793
  1491951 | 5522
  1513377 | 5339
(5 rows)

iox_catalog_prod=> EXPLAIN ANALYZE SELECT id, namespace_id, table_id, partition_id, object_store_id,
       min_time, max_time, to_delete, file_size_bytes,
       row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE table_id = 1489038 AND to_delete IS NULL;
                                                                          QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
 Bitmap Heap Scan on parquet_file  (cost=46050.91..47179.26 rows=283 width=200) (actual time=464.368..472.514 rows=7221 loops=1)
   Recheck Cond: ((table_id = 1489038) AND (to_delete IS NULL))
   Heap Blocks: exact=7152
   ->  BitmapAnd  (cost=46050.91..46050.91 rows=283 width=0) (actual time=463.341..463.343 rows=0 loops=1)
         ->  Bitmap Index Scan on parquet_file_table_idx  (cost=0.00..321.65 rows=22545 width=0) (actual time=1.674..1.674 rows=7221 loops=1)
               Index Cond: (table_id = 1489038)
         ->  Bitmap Index Scan on parquet_file_deleted_at_idx  (cost=0.00..45728.86 rows=1525373 width=0) (actual time=460.717..460.717 rows=4772117 loops=1)
               Index Cond: (to_delete IS NULL)
 Planning Time: 0.092 ms
 Execution Time: 472.907 ms
(10 rows)
```

I think this may also be because PostgreSQL kinda chooses the wrong
strategy, because it could just look at the existing index and filter
from there:

```text
iox_catalog_prod=> EXPLAIN ANALYZE SELECT id, namespace_id, table_id, partition_id, object_store_id,
       min_time, max_time, to_delete, file_size_bytes,
       row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE table_id = 1489038;
                                                                    QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------
 Index Scan using parquet_file_table_idx on parquet_file  (cost=0.57..86237.78 rows=22545 width=200) (actual time=0.057..6.994 rows=7221 loops=1)
   Index Cond: (table_id = 1489038)
 Planning Time: 0.094 ms
 Execution Time: 7.297 ms
(4 rows)
```

However PostgreSQL doesn't know the cardinalities well enough. So
let's add a dedicated index to make the querier faster.

* feat: new migration system

* docs: explain dirty migrations
pull/24376/head
Marco Neumann 2023-05-31 12:56:32 +02:00 committed by GitHub
parent 306171e714
commit e1c1908a0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1012 additions and 34 deletions

1
Cargo.lock generated
View File

@ -2825,6 +2825,7 @@ dependencies = [
"pretty_assertions",
"rand 0.8.5",
"serde",
"siphasher",
"snafu",
"sqlx",
"sqlx-hotswap-pool",

View File

@ -16,6 +16,7 @@ mutable_batch = { path = "../mutable_batch" }
observability_deps = { path = "../observability_deps" }
parking_lot = { version = "0.12" }
serde = { version = "1.0", features = ["derive"] }
siphasher = "0.3"
snafu = "0.7"
sqlx = { version = "0.6", features = [ "runtime-tokio-rustls" , "postgres", "uuid", "sqlite" ] }
sqlx-hotswap-pool = { path = "../sqlx-hotswap-pool" }

View File

@ -73,3 +73,15 @@ basis. As always, there are a few ways to do that:
```
psql 'dbname=iox_shared options=-csearch_path=public,iox_catalog'
```
## Failed / Dirty Migrations
Migrations might be marked as dirty in prod if they do not run all the way through. In this case, you have to manually
(using a read-write shell):
1. Revert the effect of the migration (e.g. drop created tables, drop created indices)
2. Remove the migration from the `_sqlx_migrations`. E.g. if the version of the migration is 1337, this is:
```sql
DELETE FROM _sqlx_migrations
WHERE version = 1337;
```

View File

@ -0,0 +1,13 @@
-- Add to help the querier when it searches for undeleted parquet files.
-- By default we often only have 5min to finish our statements. The `CREATE INDEX CONCURRENTLY` however takes longer.
-- In our prod test this took about 15min, but better be safe than sorry.
-- IOX_NO_TRANSACTION
SET statement_timeout TO '60min';
-- IOX_STEP_BOUNDARY
-- While `CONCURRENTLY` means it runs parallel to other writes, this command will only finish after the index was
-- successfully built.
-- IOX_NO_TRANSACTION
CREATE INDEX CONCURRENTLY IF NOT EXISTS parquet_file_table_delete_idx ON parquet_file (table_id) WHERE to_delete IS NULL;

View File

@ -253,7 +253,7 @@ pub trait NamespaceRepo: Send + Sync {
/// Specify `None` for `retention_period_ns` to get infinite retention.
async fn create(
&mut self,
name: &NamespaceName,
name: &NamespaceName<'_>,
partition_template: Option<NamespacePartitionTemplateOverride>,
retention_period_ns: Option<i64>,
) -> Result<Namespace>;

View File

@ -23,7 +23,7 @@ impl ShardId {
}
impl std::fmt::Display for ShardId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
@ -43,7 +43,7 @@ impl ShardIndex {
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

View File

@ -1,6 +1,7 @@
//! The IOx catalog keeps track of the namespaces, tables, columns, parquet files,
//! and deletes in the system. Configuration information for distributing ingest, query
//! and compaction is also stored here.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
@ -40,6 +41,7 @@ pub mod interface;
pub(crate) mod kafkaless_transition;
pub mod mem;
pub mod metrics;
pub mod migrate;
pub mod postgres;
pub mod sqlite;

View File

@ -142,7 +142,7 @@ impl RepoCollection for MemTxn {
impl NamespaceRepo for MemTxn {
async fn create(
&mut self,
name: &NamespaceName,
name: &NamespaceName<'_>,
partition_template: Option<NamespacePartitionTemplateOverride>,
retention_period_ns: Option<i64>,
) -> Result<Namespace> {

View File

@ -132,7 +132,7 @@ macro_rules! decorate {
decorate!(
impl_trait = NamespaceRepo,
methods = [
"namespace_create" = create(&mut self, name: &NamespaceName, partition_template: Option<NamespacePartitionTemplateOverride>, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_create" = create(&mut self, name: &NamespaceName<'_>, partition_template: Option<NamespacePartitionTemplateOverride>, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;

907
iox_catalog/src/migrate.rs Normal file
View File

@ -0,0 +1,907 @@
//! Better migrations.
//!
//! # Why
//! SQLx migrations don't work for use, see:
//!
//! - <https://github.com/launchbadge/sqlx/issues/2085>
//! - <https://github.com/influxdata/influxdb_iox/issues/5031>
//!
//! # Usage
//! Just place your migration in the `migrations` folder. They basically work like normal SQLx migrations but there are
//! a few extra, magic comments you can put in your code to modify the behavior.
//!
//! ## Steps
//! The entire SQL text will be executed as a single statement. However you can split it into multiple steps by using a marker:
//!
//! ```sql
//! CREATE TABLE t1 (x INT);
//!
//! -- IOX_STEP_BOUNDARY
//!
//! CREATE TABLE t2 (x INT);
//! ```
//!
//! ## Transactions
//! Each step will be executed in its own transaction. However you can opt-out of this:
//!
//! ```sql
//! -- this step is wrapped in a transaction
//! CREATE TABLE t1 (x INT);
//!
//! -- IOX_STEP_BOUNDARY
//!
//! -- this step isn't
//! -- IOX_NO_TRANSACTION
//! CREATE TABLE t2 (x INT);
//! ```
//!
//! ## Non-SQL steps
//! At the moment, we only support SQL-based migrationsteps but other step types can easily be added.
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
hash::{Hash, Hasher},
ops::Deref,
time::{Duration, Instant},
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use siphasher::sip::SipHasher13;
use sqlx::{
migrate::{AppliedMigration, Migrate, MigrateError, Migration, MigrationType, Migrator},
query, query_scalar, Acquire, Connection, Executor, PgConnection,
};
/// A single [`IOxMigration`] step.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IOxMigrationStep {
/// Execute a SQL statement.
///
/// A SQL statement MAY contain multiple sub-statements, e.g.:
///
/// ```sql
/// CREATE TABLE IF NOT EXISTS table1 (
/// id BIGINT GENERATED ALWAYS AS IDENTITY,
/// PRIMARY KEY (id),
/// );
///
/// CREATE TABLE IF NOT EXISTS table2 (
/// id BIGINT GENERATED ALWAYS AS IDENTITY,
/// PRIMARY KEY (id),
/// );
/// ```
SqlStatement {
/// The SQL text.
///
/// If [`in_transaction`](Self::SqlStatement::in_transaction) is set, this MUST NOT contain any transaction modifiers like `COMMIT`/`ROLLBACK`/`BEGIN`!
sql: Cow<'static, str>,
/// Should the execution of the SQL text be wrapped into a transaction?
///
/// Whenever possible, you likely want to set this to `true`. However some database changes like `CREATE INDEX
/// CONCURRENTLY` under PostgreSQL cannot be executed within a transaction.
in_transaction: bool,
},
}
impl IOxMigrationStep {
/// Apply migration step.
async fn apply<C>(&self, conn: &mut C) -> Result<(), MigrateError>
where
C: IOxMigrate,
{
match self {
Self::SqlStatement {
sql,
in_transaction,
} => {
if *in_transaction {
conn.exec_with_transaction(sql).await?;
} else {
conn.exec_without_transaction(sql).await?;
}
}
}
Ok(())
}
}
/// Database migration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IOxMigration {
/// Version.
///
/// This is used to order migrations.
pub version: i64,
/// Humand-readable description.
pub description: Cow<'static, str>,
/// Steps that compose this migration.
///
/// In most cases you want a single [SQL step](IOxMigrationStep::SqlStatement) which is executed
/// [in a transaction](IOxMigrationStep::SqlStatement::in_transaction).
pub steps: Vec<IOxMigrationStep>,
/// Checksum of the given steps.
pub checksum: Cow<'static, [u8]>,
}
impl IOxMigration {
/// Apply migration and return elapsed wall-clock time (measured locally).
async fn apply<C>(&self, conn: &mut C) -> Result<Duration, MigrateError>
where
C: IOxMigrate,
{
info!(
version = self.version,
description = self.description.as_ref(),
steps = self.steps.len(),
"applying migration"
);
let start = Instant::now();
conn.start_migration(self).await?;
for (i, step) in self.steps.iter().enumerate() {
info!(
version = self.version,
step = i + 1,
"applying migration step"
);
step.apply(conn).await?;
}
let elapsed = start.elapsed();
conn.finish_migration(self, elapsed).await?;
info!(
version = self.version,
description = self.description.as_ref(),
steps = self.steps.len(),
elapsed_secs = elapsed.as_secs_f64(),
"migration applied"
);
Ok(elapsed)
}
}
impl From<&Migration> for IOxMigration {
fn from(migration: &Migration) -> Self {
assert!(
migration.migration_type == MigrationType::Simple,
"migration type has to be simple but is {:?}",
migration.migration_type,
);
let steps = migration
.sql
.split("-- IOX_STEP_BOUNDARY")
.map(|sql| {
let sql = sql.trim().to_owned();
let in_transaction = !sql.contains("IOX_NO_TRANSACTION");
IOxMigrationStep::SqlStatement {
sql: sql.into(),
in_transaction,
}
})
.collect();
Self {
version: migration.version,
description: migration.description.clone(),
steps,
// Keep original (unprocessed) checksum for backwards compatibility.
checksum: migration.checksum.clone(),
}
}
}
/// Migration manager.
#[derive(Debug, PartialEq, Eq)]
pub struct IOxMigrator {
/// List of migrations.
migrations: Vec<IOxMigration>,
}
impl IOxMigrator {
/// Create new migrator.
///
/// # Panics
/// Panics if migrations are not sorted or if there are duplication [versions](IOxMigration::version).
pub fn new(migrations: impl IntoIterator<Item = IOxMigration>) -> Self {
let migrations = migrations.into_iter().collect::<Vec<_>>();
if let Some(m) = migrations.windows(2).find(|m| m[0].version > m[1].version) {
panic!(
"migrations are not sorted: version {} is before {} but should not",
m[0].version, m[1].version,
);
}
if let Some(m) = migrations.windows(2).find(|m| m[0].version == m[1].version) {
panic!(
"migrations are not not unique: version {} found twice",
m[0].version,
);
}
Self { migrations }
}
/// Run migragtor on connection/pool.
///
/// Returns set of executed [migrations](IOxMigration).
///
/// This may fail and some migrations may be applied. Also it is possible that a migration itself fails half-way in
/// which case it is marked as dirty. Subsequent migrations will fail until the issue is resolved.
pub async fn run<'a, A>(&self, migrator: A) -> Result<HashSet<i64>, MigrateError>
where
A: Acquire<'a> + Send,
<A::Connection as Deref>::Target: IOxMigrate,
{
let mut conn = migrator.acquire().await?;
self.run_direct(&mut *conn).await
}
/// Run migragtor on open connection.
///
/// See docs for [run](Self::run).
async fn run_direct<C>(&self, conn: &mut C) -> Result<HashSet<i64>, MigrateError>
where
C: IOxMigrate,
{
let lock_id = conn.generate_lock_id().await?;
<C as IOxMigrate>::lock(conn, lock_id).await?;
// creates [_migrations] table only if needed
// eventually this will likely migrate previous versions of the table
conn.ensure_migrations_table().await?;
let version = conn.dirty_version().await?;
if let Some(version) = version {
// We currently assume that migrations are NOT idempotent and hence we cannot re-apply them.
return Err(MigrateError::Dirty(version));
}
let applied_migrations = conn.list_applied_migrations().await?;
validate_applied_migrations(&applied_migrations, self)?;
let applied_migrations: HashSet<_> =
applied_migrations.into_iter().map(|m| m.version).collect();
let mut new_migrations = HashSet::new();
for migration in &self.migrations {
if applied_migrations.contains(&migration.version) {
continue;
}
migration.apply(conn).await?;
new_migrations.insert(migration.version);
}
// unlock the migrator to allow other migrators to run
// but do nothing as we already migrated
<C as IOxMigrate>::unlock(conn, lock_id).await?;
Ok(new_migrations)
}
}
impl From<&Migrator> for IOxMigrator {
fn from(migrator: &Migrator) -> Self {
assert!(
!migrator.ignore_missing,
"`Migragtor::ignore_missing` MUST NOT be set"
);
assert!(migrator.locking, "`Migrator::locking` MUST be set");
let migrations = migrator
.migrations
.iter()
.map(|migration| migration.into())
.collect::<Vec<_>>();
Self::new(migrations)
}
}
/// Validate an already-applied migration.
///
/// Checks that:
///
/// - applied migration is known
/// - checksum of applied migration and known migration match
fn validate_applied_migrations(
applied_migrations: &[AppliedMigration],
migrator: &IOxMigrator,
) -> Result<(), MigrateError> {
let migrations: HashMap<_, _> = migrator.migrations.iter().map(|m| (m.version, m)).collect();
for applied_migration in applied_migrations {
match migrations.get(&applied_migration.version) {
None => {
return Err(MigrateError::VersionMissing(applied_migration.version));
}
Some(migration) => {
if migration.checksum != applied_migration.checksum {
return Err(MigrateError::VersionMismatch(migration.version));
}
}
}
}
Ok(())
}
/// Interface of a specific database implementation (like Postgres) and the IOx migration system.
///
/// This mostly delegates to the SQLx [`Migrate`] interface but also has some extra methods.
#[async_trait]
pub trait IOxMigrate: Migrate + Send {
/// Generate a lock ID that is used for [`lock`](Self::lock) and [`unlock`](Self::unlock).
async fn generate_lock_id(&mut self) -> Result<i64, MigrateError>;
/// Lock database for migrations.
async fn lock(&mut self, lock_id: i64) -> Result<(), MigrateError>;
/// Unlock database after migration.
async fn unlock(&mut self, lock_id: i64) -> Result<(), MigrateError>;
/// Start a migration and mark it as "not finished".
async fn start_migration(&mut self, migration: &IOxMigration) -> Result<(), MigrateError>;
/// Finish a migration and register the elapsed time.
async fn finish_migration(
&mut self,
migration: &IOxMigration,
elapsed: Duration,
) -> Result<(), MigrateError>;
/// Execute a SQL statement (that may contain multiple sub-statements) within a transaction block.
///
/// Note that the SQL text can in theory contain `BEGIN`/`COMMIT` commands but shouldn't.
async fn exec_with_transaction(&mut self, sql: &str) -> Result<(), MigrateError>;
/// Execute a SQL statement (that may contain multiple sub-statements) without a transaction block.
async fn exec_without_transaction(&mut self, sql: &str) -> Result<(), MigrateError>;
}
#[async_trait]
impl IOxMigrate for PgConnection {
async fn generate_lock_id(&mut self) -> Result<i64, MigrateError> {
let db: String = query_scalar("SELECT current_database()")
.fetch_one(self)
.await?;
// A randomly generated static siphash key to ensure all migrations use the same locks.
//
// Generated with: xxd -i -l 16 /dev/urandom
let key = [
0xb8, 0x52, 0x81, 0x3c, 0x12, 0x83, 0x6f, 0xd9, 0x00, 0x4f, 0xe7, 0xe3, 0x61, 0xbd,
0x03, 0xaf,
];
let mut hasher = SipHasher13::new_with_key(&key);
db.hash(&mut hasher);
Ok(i64::from_ne_bytes(hasher.finish().to_ne_bytes()))
}
async fn lock(&mut self, lock_id: i64) -> Result<(), MigrateError> {
loop {
let is_locked: bool = query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(lock_id)
.fetch_one(&mut *self)
.await?;
if is_locked {
return Ok(());
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
async fn unlock(&mut self, lock_id: i64) -> Result<(), MigrateError> {
let _ = query("SELECT pg_advisory_unlock($1)")
.bind(lock_id)
.execute(self)
.await?;
Ok(())
}
async fn start_migration(&mut self, migration: &IOxMigration) -> Result<(), MigrateError> {
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, FALSE, $3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(self)
.await?;
Ok(())
}
async fn finish_migration(
&mut self,
migration: &IOxMigration,
elapsed: Duration,
) -> Result<(), MigrateError> {
let _ = query(
r#"
UPDATE _sqlx_migrations
SET success = TRUE, execution_time = $1
WHERE version = $2
"#,
)
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;
Ok(())
}
async fn exec_with_transaction(&mut self, sql: &str) -> Result<(), MigrateError> {
let mut tx = <Self as Connection>::begin(self).await?;
let _ = tx.execute(sql).await?;
tx.commit().await?;
Ok(())
}
async fn exec_without_transaction(&mut self, sql: &str) -> Result<(), MigrateError> {
let _ = self.execute(sql).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
mod generic {
use super::*;
#[test]
#[should_panic(
expected = "migrations are not sorted: version 2 is before 1 but should not"
)]
fn test_migrator_new_panic_not_sorted() {
IOxMigrator::new([
IOxMigration {
version: 2,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
},
IOxMigration {
version: 1,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
},
]);
}
#[test]
#[should_panic(expected = "migrations are not not unique: version 2 found twice")]
fn test_migrator_new_panic_not_unique() {
IOxMigrator::new([
IOxMigration {
version: 2,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
},
IOxMigration {
version: 2,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
},
]);
}
#[test]
#[should_panic(expected = "`Migrator::locking` MUST be set")]
fn test_convert_migrator_from_sqlx_panic_no_locking() {
let _ = IOxMigrator::from(&Migrator {
migrations: vec![].into(),
ignore_missing: false,
locking: false,
});
}
#[test]
#[should_panic(expected = "`Migragtor::ignore_missing` MUST NOT be set")]
fn test_convert_migrator_from_sqlx_panic_ignore_missing() {
let _ = IOxMigrator::from(&Migrator {
migrations: vec![].into(),
ignore_missing: true,
locking: true,
});
}
#[test]
#[should_panic(expected = "migration type has to be simple but is ReversibleUp")]
fn test_convert_migrator_from_sqlx_panic_invalid_migration_type_rev_up() {
let _ = IOxMigrator::from(&Migrator {
migrations: vec![Migration {
version: 1,
description: "".into(),
migration_type: MigrationType::ReversibleUp,
sql: "".into(),
checksum: vec![].into(),
}]
.into(),
ignore_missing: false,
locking: true,
});
}
#[test]
#[should_panic(expected = "migration type has to be simple but is ReversibleDown")]
fn test_convert_migrator_from_sqlx_panic_invalid_migration_type_rev_down() {
let _ = IOxMigrator::from(&Migrator {
migrations: vec![Migration {
version: 1,
description: "".into(),
migration_type: MigrationType::ReversibleDown,
sql: "".into(),
checksum: vec![].into(),
}]
.into(),
ignore_missing: false,
locking: true,
});
}
#[test]
fn test_convert_migrator_from_sqlx_ok() {
let actual = IOxMigrator::from(&Migrator {
migrations: vec![
Migration {
version: 1,
description: "some descr".into(),
migration_type: MigrationType::Simple,
sql: "SELECT 1;".into(),
checksum: vec![1, 2, 3].into(),
},
Migration {
version: 10,
description: "more descr".into(),
migration_type: MigrationType::Simple,
sql: "SELECT 2;\n-- IOX_STEP_BOUNDARY\n-- IOX_NO_TRANSACTION\nSELECT 3;"
.into(),
checksum: vec![4, 5, 6].into(),
},
]
.into(),
ignore_missing: false,
locking: true,
});
let expected = IOxMigrator {
migrations: vec![
IOxMigration {
version: 1,
description: "some descr".into(),
steps: vec![IOxMigrationStep::SqlStatement {
sql: "SELECT 1;".into(),
in_transaction: true,
}],
checksum: vec![1, 2, 3].into(),
},
IOxMigration {
version: 10,
description: "more descr".into(),
steps: vec![
IOxMigrationStep::SqlStatement {
sql: "SELECT 2;".into(),
in_transaction: true,
},
IOxMigrationStep::SqlStatement {
sql: "-- IOX_NO_TRANSACTION\nSELECT 3;".into(),
in_transaction: false,
},
],
checksum: vec![4, 5, 6].into(),
},
],
};
assert_eq!(actual, expected);
}
}
mod postgres {
use std::sync::Arc;
use futures::{stream::FuturesUnordered, StreamExt};
use sqlx::{pool::PoolConnection, Postgres};
use test_helpers::maybe_start_logging;
use crate::postgres::test_utils::{maybe_skip_integration, setup_db_no_migration};
use super::*;
#[tokio::test]
async fn test_step_sql_statement_no_transaction() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
conn.execute("CREATE TABLE t (x INT);").await.unwrap();
let create_index_concurrently = "CREATE INDEX CONCURRENTLY i ON t (x);";
// `CREATE INDEX CONCURRENTLY` is NOT possible w/ a transaction. Verify that.
IOxMigrationStep::SqlStatement {
sql: create_index_concurrently.into(),
in_transaction: true,
}
.apply(conn)
.await
.unwrap_err();
// ... but it IS possible w/o a transaction.
IOxMigrationStep::SqlStatement {
sql: create_index_concurrently.into(),
in_transaction: false,
}
.apply(conn)
.await
.unwrap();
}
#[tokio::test]
async fn test_migrator_happy_path() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::new([
IOxMigration {
version: 1,
description: "".into(),
steps: vec![
IOxMigrationStep::SqlStatement {
sql: "CREATE TABLE t (x INT);".into(),
in_transaction: false,
},
IOxMigrationStep::SqlStatement {
sql: "INSERT INTO t (x) VALUES (1); INSERT INTO t (x) VALUES (10);"
.into(),
in_transaction: true,
},
],
checksum: vec![].into(),
},
IOxMigration {
version: 2,
description: "".into(),
steps: vec![IOxMigrationStep::SqlStatement {
sql: "INSERT INTO t (x) VALUES (100);".into(),
in_transaction: true,
}],
checksum: vec![].into(),
},
]);
let applied = migrator.run_direct(conn).await.unwrap();
assert_eq!(applied, HashSet::from([1, 2]));
let r = sqlx::query_as::<_, Res>("SELECT SUM(x)::INT AS r FROM t;")
.fetch_one(conn)
.await
.unwrap()
.r;
assert_eq!(r, 111);
}
#[tokio::test]
async fn test_migrator_only_apply_new_migrations() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![IOxMigrationStep::SqlStatement {
// NOT idempotent!
sql: "CREATE TABLE t (x INT);".into(),
in_transaction: false,
}],
checksum: vec![].into(),
}]);
let applied = migrator.run_direct(conn).await.unwrap();
assert_eq!(applied, HashSet::from([1]));
let migrator =
IOxMigrator::new(migrator.migrations.iter().cloned().chain([IOxMigration {
version: 2,
description: "".into(),
steps: vec![IOxMigrationStep::SqlStatement {
// NOT idempotent!
sql: "CREATE TABLE s (x INT);".into(),
in_transaction: false,
}],
checksum: vec![].into(),
}]));
let applied = migrator.run_direct(conn).await.unwrap();
assert_eq!(applied, HashSet::from([2]));
let applied = migrator.run_direct(conn).await.unwrap();
assert_eq!(applied, HashSet::from([]));
}
#[tokio::test]
async fn test_migrator_fail_migration_missing() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
}]);
migrator.run_direct(conn).await.unwrap();
let migrator = IOxMigrator::new([IOxMigration {
version: 2,
description: "".into(),
steps: vec![],
checksum: vec![].into(),
}]);
let err = migrator.run_direct(conn).await.unwrap_err();
assert_eq!(
err.to_string(),
"migration 1 was previously applied but is missing in the resolved migrations"
);
}
#[tokio::test]
async fn test_migrator_fail_checksum_mismatch() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![],
checksum: vec![1, 2, 3].into(),
}]);
migrator.run_direct(conn).await.unwrap();
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![],
checksum: vec![4, 5, 6].into(),
}]);
let err = migrator.run_direct(conn).await.unwrap_err();
assert_eq!(
err.to_string(),
"migration 1 was previously applied but has been modified"
);
}
#[tokio::test]
async fn test_migrator_fail_dirty() {
maybe_skip_integration!();
let mut conn = setup().await;
let conn = &mut *conn;
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![IOxMigrationStep::SqlStatement {
sql: "foo".into(),
in_transaction: false,
}],
checksum: vec![1, 2, 3].into(),
}]);
migrator.run_direct(conn).await.unwrap_err();
let migrator = IOxMigrator::new([IOxMigration {
version: 1,
description: "".into(),
steps: vec![],
// same checksum, but now w/ valid steps (to simulate a once failed SQL statement)
checksum: vec![1, 2, 3].into(),
}]);
let err = migrator.run_direct(conn).await.unwrap_err();
assert_eq!(
err.to_string(),
"migration 1 is partially applied; fix and remove row from `_sqlx_migrations` table"
);
}
/// Tests that `CREATE INDEX CONCURRENTLY` doesn't deadlock.
///
/// Originally we used SQLx to acquire the locks which uses `pg_advisory_lock`. However this seems to acquire a
/// global "shared lock". Other migration frameworks faced the same issue and use `pg_try_advisory_lock`
/// instead. Also see:
///
/// - <https://github.com/flyway/flyway/issues/1654>
/// - <https://github.com/flyway/flyway/commit/4a185ebcddfb7dac875b7afa5fa270aca621ce1d>
#[tokio::test]
async fn test_locking() {
const N_TABLES_AND_INDICES: usize = 10;
const N_CONCURRENT_MIGRATIONS: usize = 100;
maybe_skip_integration!();
maybe_start_logging();
let pool = setup_db_no_migration().await.into_pool();
let migrator = Arc::new(IOxMigrator::new((0..N_TABLES_AND_INDICES).map(|i| {
IOxMigration {
version: i as i64,
description: "".into(),
steps: vec![
IOxMigrationStep::SqlStatement {
sql: format!("CREATE TABLE t{i} (x INT);").into(),
in_transaction: false,
},
IOxMigrationStep::SqlStatement {
sql: format!("CREATE INDEX CONCURRENTLY i{i} ON t{i} (x);").into(),
in_transaction: false,
},
],
checksum: vec![].into(),
}
})));
let mut futures: FuturesUnordered<_> = (0..N_CONCURRENT_MIGRATIONS)
.map(move |_| {
let migrator = Arc::clone(&migrator);
let pool = pool.clone();
async move {
// pool might timeout, so add another retry loop around it
let mut conn = loop {
let pool = pool.clone();
if let Ok(conn) = pool.acquire().await {
break conn;
}
};
let conn = &mut *conn;
migrator.run_direct(conn).await.unwrap();
}
})
.collect();
while futures.next().await.is_some() {}
}
async fn setup() -> PoolConnection<Postgres> {
maybe_start_logging();
let pool = setup_db_no_migration().await.into_pool();
pool.acquire().await.unwrap()
}
#[derive(sqlx::FromRow)]
struct Res {
r: i32,
}
}
}

View File

@ -11,6 +11,7 @@ use crate::{
TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
},
metrics::MetricDecorator,
migrate::IOxMigrator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
};
use async_trait::async_trait;
@ -127,6 +128,11 @@ impl PostgresCatalog {
fn schema_name(&self) -> &str {
&self.options.schema_name
}
#[cfg(test)]
pub(crate) fn into_pool(self) -> HotSwapPool<Postgres> {
self.pool
}
}
impl Display for PostgresCatalog {
@ -233,7 +239,8 @@ impl Catalog for PostgresCatalog {
.await
.map_err(|e| Error::Setup { source: e })?;
MIGRATOR
let migrator = IOxMigrator::from(&MIGRATOR);
migrator
.run(&self.pool)
.await
.map_err(|e| Error::Setup { source: e.into() })?;
@ -498,7 +505,7 @@ impl RepoCollection for PostgresTxn {
impl NamespaceRepo for PostgresTxn {
async fn create(
&mut self,
name: &NamespaceName,
name: &NamespaceName<'_>,
partition_template: Option<NamespacePartitionTemplateOverride>,
retention_period_ns: Option<i64>,
) -> Result<Namespace> {
@ -1614,36 +1621,32 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
false
}
/// Test helpers postgres testing.
#[cfg(test)]
mod tests {
pub(crate) mod test_utils {
use super::*;
use crate::test_helpers::{arbitrary_namespace, arbitrary_table};
use assert_matches::assert_matches;
use data_types::{partition_template::TemplatePart, ColumnId, ColumnSet};
use generated_types::influxdata::iox::partition_template::v1 as proto;
use metric::{Attributes, DurationHistogram, Metric};
use rand::Rng;
use sqlx::migrate::MigrateDatabase;
use std::{env, io::Write, sync::Arc, time::Instant};
use tempfile::NamedTempFile;
// Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment
// variables are not set.
pub const TEST_DSN_ENV: &str = "TEST_INFLUXDB_IOX_CATALOG_DSN";
/// Helper macro to skip tests if TEST_INTEGRATION and TEST_INFLUXDB_IOX_CATALOG_DSN environment
/// variables are not set.
macro_rules! maybe_skip_integration {
($panic_msg:expr) => {{
dotenvy::dotenv().ok();
let required_vars = ["TEST_INFLUXDB_IOX_CATALOG_DSN"];
let required_vars = [crate::postgres::test_utils::TEST_DSN_ENV];
let unset_vars: Vec<_> = required_vars
.iter()
.filter_map(|&name| match env::var(name) {
.filter_map(|&name| match std::env::var(name) {
Ok(_) => None,
Err(_) => Some(name),
})
.collect();
let unset_var_names = unset_vars.join(", ");
let force = env::var("TEST_INTEGRATION");
let force = std::env::var("TEST_INTEGRATION");
if force.is_ok() && !unset_var_names.is_empty() {
panic!(
@ -1674,19 +1677,9 @@ mod tests {
};
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
pub(crate) use maybe_skip_integration;
let hit_count = histogram.sample_count();
assert!(hit_count > 0, "metric did not record any calls");
}
async fn create_db(dsn: &str) {
pub async fn create_db(dsn: &str) {
// Create the catalog database if it doesn't exist
if !Postgres::database_exists(dsn).await.unwrap() {
// Ignore failure if another test has already created the database
@ -1694,7 +1687,7 @@ mod tests {
}
}
async fn setup_db() -> PostgresCatalog {
pub async fn setup_db_no_migration() -> PostgresCatalog {
// create a random schema for this particular pool
let schema_name = {
// use scope to make it clear to clippy / rust that `rng` is
@ -1741,10 +1734,59 @@ mod tests {
.await
.expect("failed to grant privileges to schema");
pg
}
pub async fn setup_db() -> PostgresCatalog {
let pg = setup_db_no_migration().await;
// Run the migrations against this random schema.
pg.setup().await.expect("failed to initialise database");
pg
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
postgres::test_utils::{
create_db, maybe_skip_integration, setup_db, setup_db_no_migration,
},
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use assert_matches::assert_matches;
use data_types::{partition_template::TemplatePart, ColumnId, ColumnSet};
use generated_types::influxdata::iox::partition_template::v1 as proto;
use metric::{Attributes, DurationHistogram, Metric};
use std::{io::Write, sync::Arc, time::Instant};
use tempfile::NamedTempFile;
use test_helpers::maybe_start_logging;
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.sample_count();
assert!(hit_count > 0, "metric did not record any calls");
}
#[tokio::test]
async fn test_migration() {
maybe_skip_integration!();
maybe_start_logging();
let postgres = setup_db_no_migration().await;
// 1st setup
postgres.setup().await.unwrap();
// 2nd setup
postgres.setup().await.unwrap();
}
#[tokio::test]
async fn test_catalog() {

View File

@ -261,7 +261,7 @@ impl RepoCollection for SqliteTxn {
impl NamespaceRepo for SqliteTxn {
async fn create(
&mut self,
name: &NamespaceName,
name: &NamespaceName<'_>,
partition_template: Option<NamespacePartitionTemplateOverride>,
retention_period_ns: Option<i64>,
) -> Result<Namespace> {