From aa74bb62924b78d0729b53e82ef8ec86d70329f3 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 9 Feb 2023 17:21:55 +0100 Subject: [PATCH] test(catalog): independent test state All our catalog tests run as one test, over one database connection. Prior to this commit, there was no state reset during test execution, so earlier tests would pollute the state of later tests, making it an increasingly complex and intermingled set of tests trying to assert their entities while ignoring other, previously created entities (or changing the order of test execution in search of the golden ordering that makes everything great again.) This is a bit of a hack, and is not how I'd have structured catalog testing w/ clean state if I was writing it fresh. It is what it is. This has been driving me mad for SO LONG it's SO BAD . --- iox_catalog/src/interface.rs | 94 ++++++++++++++++++++++-------------- iox_catalog/src/mem.rs | 8 ++- iox_catalog/src/postgres.rs | 37 +++++++++++++- iox_catalog/src/sqlite.rs | 9 ++-- 4 files changed, 105 insertions(+), 43 deletions(-) diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index edb1e08c98..c63ba33bd6 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -956,6 +956,7 @@ pub(crate) mod test_helpers { use data_types::{ ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; + use futures::Future; use metric::{Attributes, DurationHistogram, Metric}; use std::{ collections::BTreeSet, @@ -964,43 +965,64 @@ pub(crate) mod test_helpers { time::Duration, }; - pub(crate) async fn test_catalog(catalog: Arc) { - test_setup(Arc::clone(&catalog)).await; - test_partitions_with_recent_created_files(Arc::clone(&catalog)).await; - test_most_cold_files_partitions(Arc::clone(&catalog)).await; - test_topic(Arc::clone(&catalog)).await; - test_query_pool(Arc::clone(&catalog)).await; - test_namespace(Arc::clone(&catalog)).await; - test_table(Arc::clone(&catalog)).await; - test_column(Arc::clone(&catalog)).await; - test_shards(Arc::clone(&catalog)).await; - test_partition(Arc::clone(&catalog)).await; - test_tombstone(Arc::clone(&catalog)).await; - test_tombstones_by_parquet_file(Arc::clone(&catalog)).await; - test_parquet_file(Arc::clone(&catalog)).await; - test_parquet_file_delete_broken(Arc::clone(&catalog)).await; - test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await; - test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await; - test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await; - test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await; - test_update_to_compaction_level_1(Arc::clone(&catalog)).await; - test_processed_tombstones(Arc::clone(&catalog)).await; - test_list_by_partiton_not_to_delete(Arc::clone(&catalog)).await; - test_txn_isolation(Arc::clone(&catalog)).await; - test_txn_drop(Arc::clone(&catalog)).await; - test_list_schemas(Arc::clone(&catalog)).await; - test_delete_namespace(Arc::clone(&catalog)).await; + pub(crate) async fn test_catalog(clean_state: R) + where + R: Fn() -> F + Send + Sync, + F: Future> + Send, + { + test_setup(clean_state().await).await; + test_partitions_with_recent_created_files(clean_state().await).await; + test_most_cold_files_partitions(clean_state().await).await; + test_query_pool(clean_state().await).await; + test_column(clean_state().await).await; + test_partition(clean_state().await).await; + test_tombstone(clean_state().await).await; + test_tombstones_by_parquet_file(clean_state().await).await; + test_parquet_file(clean_state().await).await; + test_parquet_file_delete_broken(clean_state().await).await; + test_parquet_file_compaction_level_0(clean_state().await).await; + test_parquet_file_compaction_level_1(clean_state().await).await; + test_recent_highest_throughput_partitions(clean_state().await).await; + test_partitions_with_small_l1_file_count(clean_state().await).await; + test_update_to_compaction_level_1(clean_state().await).await; + test_processed_tombstones(clean_state().await).await; + test_list_by_partiton_not_to_delete(clean_state().await).await; + test_txn_isolation(clean_state().await).await; + test_txn_drop(clean_state().await).await; + test_list_schemas(clean_state().await).await; + test_delete_namespace(clean_state().await).await; - let metrics = catalog.metrics(); - assert_metric_hit(&metrics, "topic_create_or_get"); - assert_metric_hit(&metrics, "query_create_or_get"); - assert_metric_hit(&metrics, "namespace_create"); - assert_metric_hit(&metrics, "table_create_or_get"); - assert_metric_hit(&metrics, "column_create_or_get"); - assert_metric_hit(&metrics, "shard_create_or_get"); - assert_metric_hit(&metrics, "partition_create_or_get"); - assert_metric_hit(&metrics, "tombstone_create_or_get"); - assert_metric_hit(&metrics, "parquet_create"); + let catalog = clean_state().await; + test_topic(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "topic_create_or_get"); + + let catalog = clean_state().await; + test_namespace(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "namespace_create"); + + let catalog = clean_state().await; + test_table(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "table_create_or_get"); + + let catalog = clean_state().await; + test_column(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "column_create_or_get"); + + let catalog = clean_state().await; + test_shards(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "shard_create_or_get"); + + let catalog = clean_state().await; + test_partition(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "partition_create_or_get"); + + let catalog = clean_state().await; + test_tombstone(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "tombstone_create_or_get"); + + let catalog = clean_state().await; + test_parquet_file(Arc::clone(&catalog)).await; + assert_metric_hit(&catalog.metrics(), "parquet_create"); } async fn test_setup(catalog: Arc) { diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 2fa3b9fe4e..94dfb44608 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1794,7 +1794,11 @@ mod tests { #[tokio::test] async fn test_catalog() { - let metrics = Arc::new(metric::Registry::default()); - crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new(metrics))).await; + crate::interface::test_helpers::test_catalog(|| async { + let metrics = Arc::new(metric::Registry::default()); + let x: Arc = Arc::new(MemCatalog::new(metrics)); + x + }) + .await; } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index a35d2a2ae1..5894858c50 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -2577,11 +2577,44 @@ mod tests { let tz: String = sqlx::query_scalar("SHOW TIME ZONE;") .fetch_one(&postgres.pool) .await - .expect("read application_name"); + .expect("read time zone"); assert_eq!(tz, "UTC"); + let pool = postgres.pool.clone(); + let schema_name = postgres.schema_name.clone(); + let postgres: Arc = Arc::new(postgres); - crate::interface::test_helpers::test_catalog(postgres).await; + + crate::interface::test_helpers::test_catalog(|| async { + // Clean the schema. + pool + .execute(format!("DROP SCHEMA {schema_name} CASCADE").as_str()) + .await + .expect("failed to clean schema between tests"); + + // Recreate the test schema + pool + .execute(format!("CREATE SCHEMA {schema_name};").as_str()) + .await + .expect("failed to create test schema"); + + // Ensure the test user has permission to interact with the test schema. + pool + .execute( + format!( + "GRANT USAGE ON SCHEMA {schema_name} TO public; GRANT CREATE ON SCHEMA {schema_name} TO public;" + ) + .as_str(), + ) + .await + .expect("failed to grant privileges to schema"); + + // Run the migrations against this random schema. + postgres.setup().await.expect("failed to initialise database"); + + Arc::clone(&postgres) + }) + .await; } #[tokio::test] diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 8123a99ed6..c9b538c482 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -2371,9 +2371,12 @@ mod tests { #[tokio::test] async fn test_catalog() { - let sqlite = setup_db().await; - let sqlite: Arc = Arc::new(sqlite); - interface::test_helpers::test_catalog(sqlite).await; + interface::test_helpers::test_catalog(|| async { + let sqlite = setup_db().await; + let sqlite: Arc = Arc::new(sqlite); + sqlite + }) + .await; } #[tokio::test]