test(catalog): independent test state

All our catalog tests run as one test, over one database connection.
Prior to this commit, there was no state reset during test execution, so
earlier tests would pollute the state of later tests, making it an
increasingly complex and intermingled set of tests trying to assert
their entities while ignoring other, previously created entities (or
changing the order of test execution in search of the golden ordering
that makes everything great again.)

This is a bit of a hack, and is not how I'd have structured catalog
testing w/ clean state if I was writing it fresh. It is what it is.

This has been driving me mad for SO LONG it's SO BAD <shakes fist>.
pull/24376/head
Dom Dwyer 2023-02-09 17:21:55 +01:00
parent fd42f94fb8
commit aa74bb6292
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 105 additions and 43 deletions

View File

@ -956,6 +956,7 @@ pub(crate) mod test_helpers {
use data_types::{
ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use futures::Future;
use metric::{Attributes, DurationHistogram, Metric};
use std::{
collections::BTreeSet,
@ -964,43 +965,64 @@ pub(crate) mod test_helpers {
time::Duration,
};
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
test_most_cold_files_partitions(Arc::clone(&catalog)).await;
test_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
test_namespace(Arc::clone(&catalog)).await;
test_table(Arc::clone(&catalog)).await;
test_column(Arc::clone(&catalog)).await;
test_shards(Arc::clone(&catalog)).await;
test_partition(Arc::clone(&catalog)).await;
test_tombstone(Arc::clone(&catalog)).await;
test_tombstones_by_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_delete_broken(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
test_processed_tombstones(Arc::clone(&catalog)).await;
test_list_by_partiton_not_to_delete(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
test_txn_drop(Arc::clone(&catalog)).await;
test_list_schemas(Arc::clone(&catalog)).await;
test_delete_namespace(Arc::clone(&catalog)).await;
pub(crate) async fn test_catalog<R, F>(clean_state: R)
where
R: Fn() -> F + Send + Sync,
F: Future<Output = Arc<dyn Catalog>> + Send,
{
test_setup(clean_state().await).await;
test_partitions_with_recent_created_files(clean_state().await).await;
test_most_cold_files_partitions(clean_state().await).await;
test_query_pool(clean_state().await).await;
test_column(clean_state().await).await;
test_partition(clean_state().await).await;
test_tombstone(clean_state().await).await;
test_tombstones_by_parquet_file(clean_state().await).await;
test_parquet_file(clean_state().await).await;
test_parquet_file_delete_broken(clean_state().await).await;
test_parquet_file_compaction_level_0(clean_state().await).await;
test_parquet_file_compaction_level_1(clean_state().await).await;
test_recent_highest_throughput_partitions(clean_state().await).await;
test_partitions_with_small_l1_file_count(clean_state().await).await;
test_update_to_compaction_level_1(clean_state().await).await;
test_processed_tombstones(clean_state().await).await;
test_list_by_partiton_not_to_delete(clean_state().await).await;
test_txn_isolation(clean_state().await).await;
test_txn_drop(clean_state().await).await;
test_list_schemas(clean_state().await).await;
test_delete_namespace(clean_state().await).await;
let metrics = catalog.metrics();
assert_metric_hit(&metrics, "topic_create_or_get");
assert_metric_hit(&metrics, "query_create_or_get");
assert_metric_hit(&metrics, "namespace_create");
assert_metric_hit(&metrics, "table_create_or_get");
assert_metric_hit(&metrics, "column_create_or_get");
assert_metric_hit(&metrics, "shard_create_or_get");
assert_metric_hit(&metrics, "partition_create_or_get");
assert_metric_hit(&metrics, "tombstone_create_or_get");
assert_metric_hit(&metrics, "parquet_create");
let catalog = clean_state().await;
test_topic(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "topic_create_or_get");
let catalog = clean_state().await;
test_namespace(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "namespace_create");
let catalog = clean_state().await;
test_table(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "table_create_or_get");
let catalog = clean_state().await;
test_column(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "column_create_or_get");
let catalog = clean_state().await;
test_shards(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "shard_create_or_get");
let catalog = clean_state().await;
test_partition(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "partition_create_or_get");
let catalog = clean_state().await;
test_tombstone(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "tombstone_create_or_get");
let catalog = clean_state().await;
test_parquet_file(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "parquet_create");
}
async fn test_setup(catalog: Arc<dyn Catalog>) {

View File

@ -1794,7 +1794,11 @@ mod tests {
#[tokio::test]
async fn test_catalog() {
let metrics = Arc::new(metric::Registry::default());
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new(metrics))).await;
crate::interface::test_helpers::test_catalog(|| async {
let metrics = Arc::new(metric::Registry::default());
let x: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
x
})
.await;
}
}

View File

@ -2577,11 +2577,44 @@ mod tests {
let tz: String = sqlx::query_scalar("SHOW TIME ZONE;")
.fetch_one(&postgres.pool)
.await
.expect("read application_name");
.expect("read time zone");
assert_eq!(tz, "UTC");
let pool = postgres.pool.clone();
let schema_name = postgres.schema_name.clone();
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
crate::interface::test_helpers::test_catalog(postgres).await;
crate::interface::test_helpers::test_catalog(|| async {
// Clean the schema.
pool
.execute(format!("DROP SCHEMA {schema_name} CASCADE").as_str())
.await
.expect("failed to clean schema between tests");
// Recreate the test schema
pool
.execute(format!("CREATE SCHEMA {schema_name};").as_str())
.await
.expect("failed to create test schema");
// Ensure the test user has permission to interact with the test schema.
pool
.execute(
format!(
"GRANT USAGE ON SCHEMA {schema_name} TO public; GRANT CREATE ON SCHEMA {schema_name} TO public;"
)
.as_str(),
)
.await
.expect("failed to grant privileges to schema");
// Run the migrations against this random schema.
postgres.setup().await.expect("failed to initialise database");
Arc::clone(&postgres)
})
.await;
}
#[tokio::test]

View File

@ -2371,9 +2371,12 @@ mod tests {
#[tokio::test]
async fn test_catalog() {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
interface::test_helpers::test_catalog(sqlite).await;
interface::test_helpers::test_catalog(|| async {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
sqlite
})
.await;
}
#[tokio::test]