diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index a820f0a498..bab02cb001 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -137,10 +137,14 @@ mod tests { use super::*; use chrono::TimeZone; use data_types::{ - ColumnId, ColumnSet, CompactionLevel, NamespaceId, NamespaceName, ParquetFile, - ParquetFileParams, PartitionId, TableId, Timestamp, + ColumnId, ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileParams, + PartitionId, TableId, Timestamp, + }; + use iox_catalog::{ + interface::Catalog, + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, }; - use iox_catalog::{interface::Catalog, mem::MemCatalog}; use object_store::path::Path; use once_cell::sync::Lazy; use parquet_file::ParquetFilePath; @@ -155,19 +159,8 @@ mod tests { let metric_registry = Arc::new(metric::Registry::new()); let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("namespace_parquet_file_test").unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await; + let table = arbitrary_table(&mut *repos, "test_table", &namespace).await; let partition = repos .partitions() .create_or_get("one".into(), table.id) diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index 0813baadfd..1c501a0996 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -345,7 +345,10 @@ mod tests { use crate::{AggregateTSMField, AggregateTSMTag}; use assert_matches::assert_matches; use data_types::{PartitionId, TableId}; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use std::collections::HashSet; #[tokio::test] @@ -428,17 +431,9 @@ mod tests { .await .expect("started transaction"); // create namespace, table and columns for weather measurement - let namespace = txn - .namespaces() - .create(&NamespaceName::new("1234_5678").unwrap(), None) - .await - .expect("namespace created"); - let mut table = txn - .tables() - .create_or_get("weather", namespace.id) - .await - .map(|t| TableSchema::new_empty_from(&t)) - .expect("table created"); + let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await; + let table = arbitrary_table(&mut *txn, "weather", &namespace).await; + let mut table = TableSchema::new_empty_from(&table); let time_col = txn .columns() .create_or_get("time", table.id, ColumnType::Time) @@ -520,17 +515,9 @@ mod tests { .await .expect("started transaction"); // create namespace, table and columns for weather measurement - let namespace = txn - .namespaces() - .create(&NamespaceName::new("1234_5678").unwrap(), None) - .await - .expect("namespace created"); - let mut table = txn - .tables() - .create_or_get("weather", namespace.id) - .await - .map(|t| TableSchema::new_empty_from(&t)) - .expect("table created"); + let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await; + let table = arbitrary_table(&mut *txn, "weather", &namespace).await; + let mut table = TableSchema::new_empty_from(&table); let time_col = txn .columns() .create_or_get("time", table.id, ColumnType::Time) @@ -585,17 +572,9 @@ mod tests { .expect("started transaction"); // create namespace, table and columns for weather measurement - let namespace = txn - .namespaces() - .create(&NamespaceName::new("1234_5678").unwrap(), None) - .await - .expect("namespace created"); - let mut table = txn - .tables() - .create_or_get("weather", namespace.id) - .await - .map(|t| TableSchema::new_empty_from(&t)) - .expect("table created"); + let namespace = arbitrary_namespace(&mut *txn, "1234_5678").await; + let table = arbitrary_table(&mut *txn, "weather", &namespace).await; + let mut table = TableSchema::new_empty_from(&table); let time_col = txn .columns() .create_or_get("time", table.id, ColumnType::Time) diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 86aa2767d4..a709aa3a31 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -99,6 +99,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; + use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table}; use super::*; @@ -114,18 +115,10 @@ mod tests { let (namespace_id, table_id) = { let mut repos = catalog.repositories().await; - let table_ns_name = data_types::NamespaceName::new(TABLE_NAME).unwrap(); - let ns = repos - .namespaces() - .create(&table_ns_name, None) - .await - .unwrap(); + let ns = arbitrary_namespace(&mut *repos, NAMESPACE_NAME).await; - let table = repos - .tables() - .create_or_get(TABLE_NAME, ns.id) - .await - .unwrap(); + let table = arbitrary_table(&mut *repos, TABLE_NAME, &ns) + .await; (ns.id, table.id) }; diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 6920ab7c08..2bda8c2721 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -2,7 +2,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId}; use dml::{DmlMeta, DmlWrite}; -use iox_catalog::interface::Catalog; +use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; use schema::Projection; @@ -298,13 +298,7 @@ pub(crate) async fn populate_catalog( table: &str, ) -> (NamespaceId, TableId) { let mut c = catalog.repositories().await; - let namespace_name = data_types::NamespaceName::new(namespace).unwrap(); - let ns_id = c - .namespaces() - .create(&namespace_name, None) - .await - .unwrap() - .id; + let ns_id = arbitrary_namespace(&mut *c, namespace).await.id; let table_id = c.tables().create_or_get(table, ns_id).await.unwrap().id; (ns_id, table_id) diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index 47f4860b1e..d3f7f68f44 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -17,8 +17,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_flight::{decode::FlightRecordBatchStream, flight_service_server::FlightService, Ticket}; use data_types::{ - Namespace, NamespaceId, NamespaceName, NamespaceSchema, ParquetFile, PartitionKey, - SequenceNumber, TableId, + Namespace, NamespaceId, NamespaceSchema, ParquetFile, PartitionKey, SequenceNumber, TableId, }; use dml::{DmlMeta, DmlWrite}; use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt}; @@ -29,6 +28,7 @@ use ingester::{IngesterGuard, IngesterRpcInterface}; use ingester_query_grpc::influxdata::iox::ingester::v1::IngesterQueryRequest; use iox_catalog::{ interface::{Catalog, SoftDeletedRows}, + test_helpers::arbitrary_namespace, validate_or_insert_schema, }; use iox_time::TimeProvider; @@ -203,14 +203,8 @@ where name: &str, retention_period_ns: Option, ) -> Namespace { - let ns = self - .catalog - .repositories() - .await - .namespaces() - .create(&NamespaceName::new(name).unwrap(), None) - .await - .expect("failed to create test namespace"); + let mut repos = self.catalog.repositories().await; + let ns = arbitrary_namespace(&mut *repos, name).await; assert!( self.namespaces diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index ccba28e803..08f344a40c 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -736,7 +736,10 @@ pub async fn list_schemas( #[cfg(test)] pub(crate) mod test_helpers { - use crate::{validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES}; + use crate::{ + test_helpers::{arbitrary_namespace, arbitrary_table}, + validate_or_insert_schema, DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, + }; use super::*; use ::test_helpers::{assert_contains, tracing::TracingCapture}; @@ -846,12 +849,7 @@ pub(crate) mod test_helpers { .unwrap(); assert!(not_found.is_none()); - let namespace2_name = NamespaceName::new("test_namespace2").unwrap(); - let namespace2 = repos - .namespaces() - .create(&namespace2_name, None) - .await - .unwrap(); + let namespace2 = arbitrary_namespace(&mut *repos, "test_namespace2").await; let mut namespaces = repos .namespaces() .list(SoftDeletedRows::ExcludeDeleted) @@ -894,13 +892,8 @@ pub(crate) mod test_helpers { .expect("namespace should be updateable"); assert!(modified.retention_period_ns.is_none()); - // create namespace with retention period NULL - let namespace3_name = NamespaceName::new("test_namespace3").unwrap(); - let namespace3 = repos - .namespaces() - .create(&namespace3_name, None) - .await - .expect("namespace with NULL retention should be created"); + // create namespace with retention period NULL (the default) + let namespace3 = arbitrary_namespace(&mut *repos, "test_namespace3").await; assert!(namespace3.retention_period_ns.is_none()); // create namespace with retention period @@ -954,16 +947,8 @@ pub(crate) mod test_helpers { async fn test_namespace_soft_deletion(catalog: Arc) { let mut repos = catalog.repositories().await; - let deleted_ns = repos - .namespaces() - .create(&"deleted-ns".try_into().unwrap(), None) - .await - .unwrap(); - let active_ns = repos - .namespaces() - .create(&"active-ns".try_into().unwrap(), None) - .await - .unwrap(); + let deleted_ns = arbitrary_namespace(&mut *repos, "deleted-ns").await; + let active_ns = arbitrary_namespace(&mut *repos, "active-ns").await; // Mark "deleted-ns" as soft-deleted. repos.namespaces().soft_delete("deleted-ns").await.unwrap(); @@ -1117,23 +1102,11 @@ pub(crate) mod test_helpers { async fn test_table(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("namespace_table_test").unwrap(), None) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_table_test").await; // test we can create or get a table - let t = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let tt = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); + let t = arbitrary_table(&mut *repos, "test_table", &namespace).await; + let tt = arbitrary_table(&mut *repos, "test_table", &namespace).await; assert!(t.id > TableId::new(0)); assert_eq!(t, tt); @@ -1154,26 +1127,14 @@ pub(crate) mod test_helpers { assert_eq!(vec![t.clone()], tables); // test we can create a table of the same name in a different namespace - let namespace2 = repos - .namespaces() - .create(&NamespaceName::new("two").unwrap(), None) - .await - .unwrap(); + let namespace2 = arbitrary_namespace(&mut *repos, "two").await; assert_ne!(namespace, namespace2); - let test_table = repos - .tables() - .create_or_get("test_table", namespace2.id) - .await - .unwrap(); + let test_table = arbitrary_table(&mut *repos, "test_table", &namespace2).await; assert_ne!(tt, test_table); assert_eq!(test_table.namespace_id, namespace2.id); // test get by namespace and name - let foo_table = repos - .tables() - .create_or_get("foo", namespace2.id) - .await - .unwrap(); + let foo_table = arbitrary_table(&mut *repos, "foo", &namespace2).await; assert_eq!( repos .tables() @@ -1254,16 +1215,8 @@ pub(crate) mod test_helpers { async fn test_column(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("namespace_column_test").unwrap(), None) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_column_test").await; + let table = arbitrary_table(&mut *repos, "test_table", &namespace).await; assert_eq!(table.namespace_id, namespace.id); // test we can create or get a column @@ -1290,11 +1243,7 @@ pub(crate) mod test_helpers { assert!(matches!(err, Error::ColumnTypeMismatch { .. })); // test that we can create a column of the same name under a different table - let table2 = repos - .tables() - .create_or_get("test_table_2", namespace.id) - .await - .unwrap(); + let table2 = arbitrary_table(&mut *repos, "test_table_2", &namespace).await; let ccc = repos .columns() .create_or_get("column_test", table2.id, ColumnType::U64) @@ -1361,11 +1310,7 @@ pub(crate) mod test_helpers { )); // test per-namespace column limits are NOT enforced with create_or_get_many_unchecked - let table3 = repos - .tables() - .create_or_get("test_table_3", namespace.id) - .await - .unwrap(); + let table3 = arbitrary_table(&mut *repos, "test_table_3", &namespace).await; let mut columns = HashMap::new(); columns.insert("apples", ColumnType::Tag); columns.insert("oranges", ColumnType::Tag); @@ -1387,19 +1332,8 @@ pub(crate) mod test_helpers { async fn test_partition(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("namespace_partition_test").unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_partition_test").await; + let table = arbitrary_table(&mut *repos, "test_table", &namespace).await; let mut created = BTreeMap::new(); for key in ["foo", "bar"] { @@ -1672,24 +1606,9 @@ pub(crate) mod test_helpers { /// tests many interactions with the catalog and parquet files. See the individual conditions herein async fn test_parquet_file(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("namespace_parquet_file_test").unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let other_table = repos - .tables() - .create_or_get("other", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test").await; + let table = arbitrary_table(&mut *repos, "test_table", &namespace).await; + let other_table = arbitrary_table(&mut *repos, "other", &namespace).await; let partition = repos .partitions() .create_or_get("one".into(), table.id) @@ -1860,19 +1779,8 @@ pub(crate) mod test_helpers { assert_eq!(files.len(), 1); // test list_by_namespace_not_to_delete - let namespace2 = repos - .namespaces() - .create( - &NamespaceName::new("namespace_parquet_file_test1").unwrap(), - None, - ) - .await - .unwrap(); - let table2 = repos - .tables() - .create_or_get("test_table2", namespace2.id) - .await - .unwrap(); + let namespace2 = arbitrary_namespace(&mut *repos, "namespace_parquet_file_test1").await; + let table2 = arbitrary_table(&mut *repos, "test_table2", &namespace2).await; let partition2 = repos .partitions() .create_or_get("foo".into(), table2.id) @@ -2086,26 +1994,14 @@ pub(crate) mod test_helpers { async fn test_parquet_file_delete_broken(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace_1 = repos - .namespaces() - .create(&NamespaceName::new("retention_broken_1").unwrap(), None) - .await - .unwrap(); + let namespace_1 = arbitrary_namespace(&mut *repos, "retention_broken_1").await; let namespace_2 = repos .namespaces() .create(&NamespaceName::new("retention_broken_2").unwrap(), Some(1)) .await .unwrap(); - let table_1 = repos - .tables() - .create_or_get("test_table", namespace_1.id) - .await - .unwrap(); - let table_2 = repos - .tables() - .create_or_get("test_table", namespace_2.id) - .await - .unwrap(); + let table_1 = arbitrary_table(&mut *repos, "test_table", &namespace_1).await; + let table_2 = arbitrary_table(&mut *repos, "test_table", &namespace_2).await; let partition_1 = repos .partitions() .create_or_get("one".into(), table_1.id) @@ -2166,19 +2062,9 @@ pub(crate) mod test_helpers { async fn test_partitions_new_file_between(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("test_partitions_new_file_between").unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table_for_new_file_between", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "test_partitions_new_file_between").await; + let table = + arbitrary_table(&mut *repos, "test_table_for_new_file_between", &namespace).await; // param for the tests let time_now = Timestamp::from(catalog.time_provider().now()); @@ -2535,20 +2421,12 @@ pub(crate) mod test_helpers { async fn test_list_by_partiton_not_to_delete(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("namespace_parquet_file_test_list_by_partiton_not_to_delete") - .unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace( + &mut *repos, + "namespace_parquet_file_test_list_by_partiton_not_to_delete", + ) + .await; + let table = arbitrary_table(&mut *repos, "test_table", &namespace).await; let partition = repos .partitions() @@ -2646,19 +2524,9 @@ pub(crate) mod test_helpers { async fn test_update_to_compaction_level_1(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create( - &NamespaceName::new("namespace_update_to_compaction_level_1_test").unwrap(), - None, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("update_table", namespace.id) - .await - .unwrap(); + let namespace = + arbitrary_namespace(&mut *repos, "namespace_update_to_compaction_level_1_test").await; + let table = arbitrary_table(&mut *repos, "update_table", &namespace).await; let partition = repos .partitions() .create_or_get("test_update_to_compaction_level_1_one".into(), table.id) @@ -2735,19 +2603,9 @@ pub(crate) mod test_helpers { /// effective. async fn test_delete_namespace(catalog: Arc) { let mut repos = catalog.repositories().await; - let namespace_1 = repos - .namespaces() - .create( - &NamespaceName::new("namespace_test_delete_namespace_1").unwrap(), - None, - ) - .await - .unwrap(); - let table_1 = repos - .tables() - .create_or_get("test_table_1", namespace_1.id) - .await - .unwrap(); + let namespace_1 = + arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_1").await; + let table_1 = arbitrary_table(&mut *repos, "test_table_1", &namespace_1).await; let _c = repos .columns() .create_or_get("column_test_1", table_1.id, ColumnType::Tag) @@ -2793,19 +2651,9 @@ pub(crate) mod test_helpers { // we've now created a namespace with a table and parquet files. before we test deleting // it, let's create another so we can ensure that doesn't get deleted. - let namespace_2 = repos - .namespaces() - .create( - &NamespaceName::new("namespace_test_delete_namespace_2").unwrap(), - None, - ) - .await - .unwrap(); - let table_2 = repos - .tables() - .create_or_get("test_table_2", namespace_2.id) - .await - .unwrap(); + let namespace_2 = + arbitrary_namespace(&mut *repos, "namespace_test_delete_namespace_2").await; + let table_2 = arbitrary_table(&mut *repos, "test_table_2", &namespace_2).await; let _c = repos .columns() .create_or_get("column_test_2", table_2.id, ColumnType::Tag) @@ -2983,10 +2831,7 @@ pub(crate) mod test_helpers { barrier_captured.wait().await; let mut txn = catalog_captured.start_transaction().await.unwrap(); - txn.namespaces() - .create(&NamespaceName::new("test_txn_isolation").unwrap(), None) - .await - .unwrap(); + arbitrary_namespace(&mut *txn, "test_txn_isolation").await; tokio::time::sleep(Duration::from_millis(200)).await; txn.abort().await.unwrap(); @@ -3020,10 +2865,7 @@ pub(crate) mod test_helpers { async fn test_txn_drop(catalog: Arc) { let capture = TracingCapture::new(); let mut txn = catalog.start_transaction().await.unwrap(); - txn.namespaces() - .create(&NamespaceName::new("test_txn_drop").unwrap(), None) - .await - .unwrap(); + arbitrary_namespace(&mut *txn, "test_txn_drop").await; drop(txn); // got a warning diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index fa4cd9de9c..f03c73b088 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -201,6 +201,56 @@ where Ok(()) } +/// Catalog helper functions for creation of catalog objects +pub mod test_helpers { + use crate::RepoCollection; + use data_types::{Namespace, NamespaceName, Table}; + + /// When the details of the namespace don't matter; the test just needs *a* catalog namespace + /// with a particular name. + /// + /// Use [`NamespaceRepo::create`] directly if: + /// + /// - The values of the parameters to `create` need to be different than what's here + /// - The values of the parameters to `create` are relevant to the behavior under test + /// - You expect namespace creation to fail in the test + /// + /// [`NamespaceRepo::create`]: crate::interface::NamespaceRepo::create + pub async fn arbitrary_namespace( + repos: &mut R, + name: &str, + ) -> Namespace { + let namespace_name = NamespaceName::new(name).unwrap(); + repos + .namespaces() + .create(&namespace_name, None) + .await + .unwrap() + } + + /// When the details of the table don't matter; the test just needs *a* catalog table + /// with a particular name in a particular namespace. + /// + /// Use [`TableRepo::create_or_get`] directly if: + /// + /// - The values of the parameters to `create_or_get` need to be different than what's here + /// - The values of the parameters to `create_or_get` are relevant to the behavior under test + /// - You expect table creation to fail in the test + /// + /// [`TableRepo::create_or_get`]: crate::interface::TableRepo::create_or_get + pub async fn arbitrary_table( + repos: &mut R, + name: &str, + namespace: &Namespace, + ) -> Table { + repos + .tables() + .create_or_get(name, namespace.id) + .await + .unwrap() + } +} + #[cfg(test)] mod tests { use std::{collections::BTreeMap, sync::Arc}; @@ -210,7 +260,6 @@ mod tests { interface::{get_schema_by_name, SoftDeletedRows}, mem::MemCatalog, }; - use data_types::NamespaceName; // Generate a test that simulates multiple, sequential writes in `lp` and // asserts the resulting schema. @@ -228,21 +277,17 @@ mod tests { #[allow(clippy::bool_assert_comparison)] #[tokio::test] async fn []() { - use crate::interface::Catalog; + use crate::{interface::Catalog, test_helpers::arbitrary_namespace}; use std::ops::DerefMut; use pretty_assertions::assert_eq; const NAMESPACE_NAME: &str = "bananas"; - let ns_name = NamespaceName::new(NAMESPACE_NAME).unwrap(); let metrics = Arc::new(metric::Registry::default()); let repo = MemCatalog::new(metrics); let mut txn = repo.start_transaction().await.unwrap(); - let namespace = txn - .namespaces() - .create(&ns_name, None) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *txn, NAMESPACE_NAME) + .await; let schema = NamespaceSchema::new_empty_from(&namespace); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index b470aff6cd..e1150f5b24 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1638,6 +1638,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; + use crate::test_helpers::{arbitrary_namespace, arbitrary_table}; use assert_matches::assert_matches; use data_types::{ColumnId, ColumnSet}; use metric::{Attributes, DurationHistogram, Metric}; @@ -1821,38 +1822,21 @@ mod tests { let postgres = setup_db().await; let postgres: Arc = Arc::new(postgres); + let mut repos = postgres.repositories().await; - let namespace_id = postgres - .repositories() - .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = postgres - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") - .id; + let namespace = arbitrary_namespace(&mut *repos, "ns4").await; + let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id; let key = "bananas"; - let a = postgres - .repositories() - .await + let a = repos .partitions() .create_or_get(key.into(), table_id) .await .expect("should create OK"); // Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent. - let b = postgres - .repositories() - .await + let b = repos .partitions() .create_or_get(key.into(), table_id) .await @@ -1959,22 +1943,12 @@ mod tests { let postgres = setup_db().await; let metrics = Arc::clone(&postgres.metrics); let postgres: Arc = Arc::new(postgres); + let mut repos = postgres.repositories().await; - let namespace_id = postgres - .repositories() + let namespace = arbitrary_namespace(&mut *repos, "ns4") + .await; + let table_id = arbitrary_table(&mut *repos, "table", &namespace) .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = postgres - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") .id; $( @@ -1983,9 +1957,7 @@ mod tests { insert.insert($col_name, $col_type); )+ - let got = postgres - .repositories() - .await + let got = repos .columns() .create_or_get_many_unchecked(table_id, insert.clone()) .await; @@ -2122,29 +2094,14 @@ mod tests { let postgres = setup_db().await; let pool = postgres.pool.clone(); let postgres: Arc = Arc::new(postgres); - - let namespace_id = postgres - .repositories() - .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = postgres - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") - .id; + let mut repos = postgres.repositories().await; + let namespace = arbitrary_namespace(&mut *repos, "ns4").await; + let namespace_id = namespace.id; + let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id; let key = "bananas"; - let partition_id = postgres - .repositories() - .await + let partition_id = repos .partitions() .create_or_get(key.into(), table_id) .await @@ -2169,9 +2126,7 @@ mod tests { column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), max_l0_created_at: time_now, }; - let f1 = postgres - .repositories() - .await + let f1 = repos .parquet_files() .create(p1.clone()) .await @@ -2179,9 +2134,7 @@ mod tests { // insert the same again with a different size; we should then have 3x1337 as total file size p1.object_store_id = Uuid::new_v4(); p1.file_size_bytes *= 2; - let _f2 = postgres - .repositories() - .await + let _f2 = repos .parquet_files() .create(p1.clone()) .await @@ -2196,9 +2149,7 @@ mod tests { assert_eq!(total_file_size_bytes, 1337 * 3); // flag f1 for deletion and assert that the total file size is reduced accordingly. - postgres - .repositories() - .await + repos .parquet_files() .flag_for_delete(f1.id) .await @@ -2213,9 +2164,7 @@ mod tests { // actually deleting shouldn't change the total let now = Timestamp::from(time_provider.now()); - postgres - .repositories() - .await + repos .parquet_files() .delete_old_ids_only(now) .await diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index c7ff0ea9d8..3b294b5f6b 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1508,6 +1508,7 @@ fn is_unique_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; + use crate::test_helpers::{arbitrary_namespace, arbitrary_table}; use assert_matches::assert_matches; use metric::{Attributes, DurationHistogram, Metric}; use std::sync::Arc; @@ -1549,40 +1550,22 @@ mod tests { #[tokio::test] async fn test_partition_create_or_get_idempotent() { let sqlite = setup_db().await; - let sqlite: Arc = Arc::new(sqlite); + let mut repos = sqlite.repositories().await; - let namespace_id = sqlite - .repositories() - .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = sqlite - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") - .id; + let namespace = arbitrary_namespace(&mut *repos, "ns4").await; + let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id; let key = "bananas"; - let a = sqlite - .repositories() - .await + let a = repos .partitions() .create_or_get(key.into(), table_id) .await .expect("should create OK"); // Call create_or_get for the same (key, table_id) pair, to ensure the write is idempotent. - let b = sqlite - .repositories() - .await + let b = repos .partitions() .create_or_get(key.into(), table_id) .await @@ -1602,24 +1585,13 @@ mod tests { async fn []() { let sqlite = setup_db().await; let metrics = Arc::clone(&sqlite.metrics); - let sqlite: Arc = Arc::new(sqlite); + let mut repos = sqlite.repositories().await; - let namespace_id = sqlite - .repositories() + let namespace = arbitrary_namespace(&mut *repos, "ns4") + .await; + let table_id = arbitrary_table(&mut *repos, "table", &namespace) .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = sqlite - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") .id; $( @@ -1628,9 +1600,7 @@ mod tests { insert.insert($col_name, $col_type); )+ - let got = sqlite - .repositories() - .await + let got = repos .columns() .create_or_get_many_unchecked(table_id, insert.clone()) .await; @@ -1764,31 +1734,16 @@ mod tests { async fn test_billing_summary_on_parqet_file_creation() { let sqlite = setup_db().await; let pool = sqlite.pool.clone(); - let sqlite: Arc = Arc::new(sqlite); + let mut repos = sqlite.repositories().await; - let namespace_id = sqlite - .repositories() - .await - .namespaces() - .create(&NamespaceName::new("ns4").unwrap(), None) - .await - .expect("namespace create failed") - .id; - let table_id = sqlite - .repositories() - .await - .tables() - .create_or_get("table", namespace_id) - .await - .expect("create table failed") - .id; + let namespace = arbitrary_namespace(&mut *repos, "ns4").await; + let namespace_id = namespace.id; + let table_id = arbitrary_table(&mut *repos, "table", &namespace).await.id; let key = "bananas"; - let partition_id = sqlite - .repositories() - .await + let partition_id = repos .partitions() .create_or_get(key.into(), table_id) .await @@ -1813,9 +1768,7 @@ mod tests { column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), max_l0_created_at: time_now, }; - let f1 = sqlite - .repositories() - .await + let f1 = repos .parquet_files() .create(p1.clone()) .await @@ -1823,9 +1776,7 @@ mod tests { // insert the same again with a different size; we should then have 3x1337 as total file size p1.object_store_id = Uuid::new_v4(); p1.file_size_bytes *= 2; - let _f2 = sqlite - .repositories() - .await + let _f2 = repos .parquet_files() .create(p1.clone()) .await @@ -1840,9 +1791,7 @@ mod tests { assert_eq!(total_file_size_bytes, 1337 * 3); // flag f1 for deletion and assert that the total file size is reduced accordingly. - sqlite - .repositories() - .await + repos .parquet_files() .flag_for_delete(f1.id) .await @@ -1857,9 +1806,7 @@ mod tests { // actually deleting shouldn't change the total let now = Timestamp::from(time_provider.now()); - sqlite - .repositories() - .await + repos .parquet_files() .delete_old_ids_only(now) .await diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 859826caed..a1c2ad3fcf 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -16,6 +16,7 @@ use iox_catalog::{ get_schema_by_id, get_table_columns_by_id, Catalog, PartitionRepo, SoftDeletedRows, }, mem::MemCatalog, + test_helpers::arbitrary_table, }; use iox_query::{ exec::{DedicatedExecutors, Executor, ExecutorConfig}, @@ -220,11 +221,7 @@ impl TestNamespace { pub async fn create_table(self: &Arc, name: &str) -> Arc { let mut repos = self.catalog.catalog.repositories().await; - let table = repos - .tables() - .create_or_get(name, self.namespace.id) - .await - .unwrap(); + let table = arbitrary_table(&mut *repos, name, &self.namespace).await; Arc::new(TestTable { catalog: Arc::clone(&self.catalog), diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index aba7a9e3bf..216090e7d8 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -382,7 +382,10 @@ where #[cfg(test)] mod tests { use data_types::ColumnType; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use super::*; @@ -391,17 +394,9 @@ mod tests { let catalog = Arc::new(MemCatalog::new(Default::default())); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("test_ns").unwrap(), None) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "test_ns").await; - let table = repos - .tables() - .create_or_get("name", namespace.id) - .await - .unwrap(); + let table = arbitrary_table(&mut *repos, "name", &namespace).await; let _column = repos .columns() .create_or_get("name", table.id, ColumnType::U64) diff --git a/router/tests/http.rs b/router/tests/http.rs index 226a664735..5b5cb65dcd 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -5,7 +5,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use generated_types::influxdata::{iox::ingester::v1::WriteRequest, pbdata::v1::DatabaseBatch}; use hashbrown::HashMap; use hyper::{Body, Request, StatusCode}; -use iox_catalog::interface::SoftDeletedRows; +use iox_catalog::{interface::SoftDeletedRows, test_helpers::arbitrary_namespace}; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationHistogram, Metric, U64Counter}; use router::dml_handlers::{DmlError, RetentionError, SchemaError}; @@ -265,17 +265,7 @@ async fn test_write_propagate_ids() { .await; // Create the namespace and a set of tables. - let ns = ctx - .catalog() - .repositories() - .await - .namespaces() - .create( - &data_types::NamespaceName::new("bananas_test").unwrap(), - None, - ) - .await - .expect("failed to update table limit"); + let ns = arbitrary_namespace(&mut *ctx.catalog().repositories().await, "bananas_test").await; let catalog = ctx.catalog(); let ids = ["another", "test", "table", "platanos"] diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index bdbb2a5334..8d9a439f66 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -197,11 +197,12 @@ fn to_partition(p: data_types::Partition) -> Partition { #[cfg(test)] mod tests { use super::*; - use data_types::{ - ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp, - }; + use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp}; use generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use uuid::Uuid; #[tokio::test] @@ -214,16 +215,8 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("catalog_partition_test").unwrap(), None) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("schema_test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await; + let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await; let partition = repos .partitions() .create_or_get("foo".into(), table.id) @@ -277,16 +270,8 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("catalog_partition_test").unwrap(), None) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("schema_test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await; + let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await; partition1 = repos .partitions() .create_or_get("foo".into(), table.id) diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 293ea31ff3..d892ac99ca 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -96,11 +96,12 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService { mod tests { use super::*; use bytes::Bytes; - use data_types::{ - ColumnId, ColumnSet, CompactionLevel, NamespaceName, ParquetFileParams, Timestamp, - }; + use data_types::{ColumnId, ColumnSet, CompactionLevel, ParquetFileParams, Timestamp}; use generated_types::influxdata::iox::object_store::v1::object_store_service_server::ObjectStoreService; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use object_store::{memory::InMemory, ObjectStore}; use uuid::Uuid; @@ -112,16 +113,8 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("catalog_partition_test").unwrap(), None) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("schema_test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "catalog_partition_test").await; + let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await; let partition = repos .partitions() .create_or_get("foo".into(), table.id) diff --git a/service_grpc_schema/src/lib.rs b/service_grpc_schema/src/lib.rs index 318417c473..76f579b0fe 100644 --- a/service_grpc_schema/src/lib.rs +++ b/service_grpc_schema/src/lib.rs @@ -81,9 +81,12 @@ fn schema_to_proto(schema: Arc) -> GetSchemaRespons #[cfg(test)] mod tests { use super::*; - use data_types::{ColumnType, NamespaceName}; + use data_types::ColumnType; use generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + mem::MemCatalog, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use std::sync::Arc; #[tokio::test] @@ -93,16 +96,8 @@ mod tests { let metrics = Arc::new(metric::Registry::default()); let catalog = Arc::new(MemCatalog::new(metrics)); let mut repos = catalog.repositories().await; - let namespace = repos - .namespaces() - .create(&NamespaceName::new("namespace_schema_test").unwrap(), None) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("schema_test_table", namespace.id) - .await - .unwrap(); + let namespace = arbitrary_namespace(&mut *repos, "namespace_schema_test").await; + let table = arbitrary_table(&mut *repos, "schema_test_table", &namespace).await; repos .columns() .create_or_get("schema_test_column", table.id, ColumnType::Tag)