Merge pull request #6931 from influxdata/dom/clean-catalog-test-state

test(catalog): independent test state
pull/24376/head
kodiakhq[bot] 2023-02-09 16:44:37 +00:00 committed by GitHub
commit 822b1ffe18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 43 deletions

View File

@ -956,6 +956,7 @@ pub(crate) mod test_helpers {
use data_types::{
ColumnId, ColumnSet, CompactionLevel, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use futures::Future;
use metric::{Attributes, DurationHistogram, Metric};
use std::{
collections::BTreeSet,
@ -964,43 +965,64 @@ pub(crate) mod test_helpers {
time::Duration,
};
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
test_most_cold_files_partitions(Arc::clone(&catalog)).await;
test_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
test_namespace(Arc::clone(&catalog)).await;
test_table(Arc::clone(&catalog)).await;
test_column(Arc::clone(&catalog)).await;
test_shards(Arc::clone(&catalog)).await;
test_partition(Arc::clone(&catalog)).await;
test_tombstone(Arc::clone(&catalog)).await;
test_tombstones_by_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_delete_broken(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
test_processed_tombstones(Arc::clone(&catalog)).await;
test_list_by_partiton_not_to_delete(Arc::clone(&catalog)).await;
test_txn_isolation(Arc::clone(&catalog)).await;
test_txn_drop(Arc::clone(&catalog)).await;
test_list_schemas(Arc::clone(&catalog)).await;
test_delete_namespace(Arc::clone(&catalog)).await;
pub(crate) async fn test_catalog<R, F>(clean_state: R)
where
R: Fn() -> F + Send + Sync,
F: Future<Output = Arc<dyn Catalog>> + Send,
{
test_setup(clean_state().await).await;
test_partitions_with_recent_created_files(clean_state().await).await;
test_most_cold_files_partitions(clean_state().await).await;
test_query_pool(clean_state().await).await;
test_column(clean_state().await).await;
test_partition(clean_state().await).await;
test_tombstone(clean_state().await).await;
test_tombstones_by_parquet_file(clean_state().await).await;
test_parquet_file(clean_state().await).await;
test_parquet_file_delete_broken(clean_state().await).await;
test_parquet_file_compaction_level_0(clean_state().await).await;
test_parquet_file_compaction_level_1(clean_state().await).await;
test_recent_highest_throughput_partitions(clean_state().await).await;
test_partitions_with_small_l1_file_count(clean_state().await).await;
test_update_to_compaction_level_1(clean_state().await).await;
test_processed_tombstones(clean_state().await).await;
test_list_by_partiton_not_to_delete(clean_state().await).await;
test_txn_isolation(clean_state().await).await;
test_txn_drop(clean_state().await).await;
test_list_schemas(clean_state().await).await;
test_delete_namespace(clean_state().await).await;
let metrics = catalog.metrics();
assert_metric_hit(&metrics, "topic_create_or_get");
assert_metric_hit(&metrics, "query_create_or_get");
assert_metric_hit(&metrics, "namespace_create");
assert_metric_hit(&metrics, "table_create_or_get");
assert_metric_hit(&metrics, "column_create_or_get");
assert_metric_hit(&metrics, "shard_create_or_get");
assert_metric_hit(&metrics, "partition_create_or_get");
assert_metric_hit(&metrics, "tombstone_create_or_get");
assert_metric_hit(&metrics, "parquet_create");
let catalog = clean_state().await;
test_topic(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "topic_create_or_get");
let catalog = clean_state().await;
test_namespace(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "namespace_create");
let catalog = clean_state().await;
test_table(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "table_create_or_get");
let catalog = clean_state().await;
test_column(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "column_create_or_get");
let catalog = clean_state().await;
test_shards(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "shard_create_or_get");
let catalog = clean_state().await;
test_partition(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "partition_create_or_get");
let catalog = clean_state().await;
test_tombstone(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "tombstone_create_or_get");
let catalog = clean_state().await;
test_parquet_file(Arc::clone(&catalog)).await;
assert_metric_hit(&catalog.metrics(), "parquet_create");
}
async fn test_setup(catalog: Arc<dyn Catalog>) {

View File

@ -1794,7 +1794,11 @@ mod tests {
#[tokio::test]
async fn test_catalog() {
let metrics = Arc::new(metric::Registry::default());
crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new(metrics))).await;
crate::interface::test_helpers::test_catalog(|| async {
let metrics = Arc::new(metric::Registry::default());
let x: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
x
})
.await;
}
}

View File

@ -2577,11 +2577,44 @@ mod tests {
let tz: String = sqlx::query_scalar("SHOW TIME ZONE;")
.fetch_one(&postgres.pool)
.await
.expect("read application_name");
.expect("read time zone");
assert_eq!(tz, "UTC");
let pool = postgres.pool.clone();
let schema_name = postgres.schema_name.clone();
let postgres: Arc<dyn Catalog> = Arc::new(postgres);
crate::interface::test_helpers::test_catalog(postgres).await;
crate::interface::test_helpers::test_catalog(|| async {
// Clean the schema.
pool
.execute(format!("DROP SCHEMA {schema_name} CASCADE").as_str())
.await
.expect("failed to clean schema between tests");
// Recreate the test schema
pool
.execute(format!("CREATE SCHEMA {schema_name};").as_str())
.await
.expect("failed to create test schema");
// Ensure the test user has permission to interact with the test schema.
pool
.execute(
format!(
"GRANT USAGE ON SCHEMA {schema_name} TO public; GRANT CREATE ON SCHEMA {schema_name} TO public;"
)
.as_str(),
)
.await
.expect("failed to grant privileges to schema");
// Run the migrations against this random schema.
postgres.setup().await.expect("failed to initialise database");
Arc::clone(&postgres)
})
.await;
}
#[tokio::test]

View File

@ -2371,9 +2371,12 @@ mod tests {
#[tokio::test]
async fn test_catalog() {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
interface::test_helpers::test_catalog(sqlite).await;
interface::test_helpers::test_catalog(|| async {
let sqlite = setup_db().await;
let sqlite: Arc<dyn Catalog> = Arc::new(sqlite);
sqlite
})
.await;
}
#[tokio::test]