From bfa54033bdc2a1d147f5d4b236867d5c0998518f Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 21 Jan 2022 16:01:13 -0500 Subject: [PATCH] refactor: Clean up the Catalog API This updates the catalog API to make it easier to work with for consumers. I also found a bug in the MemCatalog implementation while refactoring the tests to work with the new API definition. Consumers will now be able to Arc wrap the catalog and use it across awaits. --- ingester/src/data.rs | 10 +- ingester/src/server.rs | 6 +- iox_catalog/src/interface.rs | 295 +++++++++++++++++++---------------- iox_catalog/src/lib.rs | 61 +++++--- iox_catalog/src/mem.rs | 81 +++++----- iox_catalog/src/postgres.rs | 63 ++++---- 6 files changed, 279 insertions(+), 237 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index b838faca54..35eba45818 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -7,8 +7,8 @@ use uuid::Uuid; use crate::server::IngesterServer; use iox_catalog::interface::{ - KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, RepoCollection, SequenceNumber, - SequencerId, TableId, Tombstone, + Catalog, KafkaPartition, KafkaTopicId, NamespaceId, PartitionId, SequenceNumber, SequencerId, + TableId, Tombstone, }; use mutable_batch::MutableBatch; use parking_lot::RwLock; @@ -54,11 +54,9 @@ pub struct Sequencers { impl Sequencers { /// One time initialize Sequencers of this Ingester - pub async fn initialize( - ingester: &IngesterServer<'_, T>, - ) -> Result { + pub async fn initialize(ingester: &IngesterServer<'_, T>) -> Result { // Get sequencer ids from the catalog - let sequencer_repro = ingester.iox_catalog.sequencer(); + let sequencer_repro = ingester.iox_catalog.sequencers(); let mut sequencers = BTreeMap::default(); let topic = ingester.get_topic(); for shard in ingester.get_kafka_partitions() { diff --git a/ingester/src/server.rs b/ingester/src/server.rs index 11ce6dc553..7c48be5389 100644 --- a/ingester/src/server.rs +++ b/ingester/src/server.rs @@ -3,13 +3,13 @@ use std::sync::Arc; -use iox_catalog::interface::{KafkaPartition, KafkaTopic, KafkaTopicId, RepoCollection}; +use iox_catalog::interface::{Catalog, KafkaPartition, KafkaTopic, KafkaTopicId}; /// The [`IngesterServer`] manages the lifecycle and contains all state for /// an `ingester` server instance. pub struct IngesterServer<'a, T> where - T: RepoCollection + Send + Sync, + T: Catalog, { /// Kafka Topic assigned to this ingester kafka_topic: KafkaTopic, @@ -21,7 +21,7 @@ where impl<'a, T> IngesterServer<'a, T> where - T: RepoCollection + Send + Sync, + T: Catalog, { /// Initialize the Ingester pub fn new(topic: KafkaTopic, shard_ids: Vec, catalog: &'a Arc) -> Self { diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index d72e91a4ee..01431ae969 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -6,7 +6,6 @@ use snafu::{OptionExt, Snafu}; use std::collections::BTreeMap; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::Arc; use uuid::Uuid; #[derive(Debug, Snafu)] @@ -247,32 +246,32 @@ impl std::fmt::Display for ParquetFileId { } } -/// Container that can return repos for each of the catalog data types. +/// Trait that contains methods for working with the catalog #[async_trait] -pub trait RepoCollection { +pub trait Catalog: Send + Sync { /// repo for kafka topics - fn kafka_topic(&self) -> Arc; + fn kafka_topics(&self) -> &dyn KafkaTopicRepo; /// repo fo rquery pools - fn query_pool(&self) -> Arc; + fn query_pools(&self) -> &dyn QueryPoolRepo; /// repo for namespaces - fn namespace(&self) -> Arc; + fn namespaces(&self) -> &dyn NamespaceRepo; /// repo for tables - fn table(&self) -> Arc; + fn tables(&self) -> &dyn TableRepo; /// repo for columns - fn column(&self) -> Arc; + fn columns(&self) -> &dyn ColumnRepo; /// repo for sequencers - fn sequencer(&self) -> Arc; + fn sequencers(&self) -> &dyn SequencerRepo; /// repo for partitions - fn partition(&self) -> Arc; + fn partitions(&self) -> &dyn PartitionRepo; /// repo for tombstones - fn tombstone(&self) -> Arc; + fn tombstones(&self) -> &dyn TombstoneRepo; /// repo for parquet_files - fn parquet_file(&self) -> Arc; + fn parquet_files(&self) -> &dyn ParquetFileRepo; } /// Functions for working with Kafka topics in the catalog. #[async_trait] -pub trait KafkaTopicRepo { +pub trait KafkaTopicRepo: Send + Sync { /// Creates the kafka topic in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; @@ -282,14 +281,14 @@ pub trait KafkaTopicRepo { /// Functions for working with query pools in the catalog. #[async_trait] -pub trait QueryPoolRepo { +pub trait QueryPoolRepo: Send + Sync { /// Creates the query pool in the catalog or gets the existing record by name. async fn create_or_get(&self, name: &str) -> Result; } /// Functions for working with namespaces in the catalog #[async_trait] -pub trait NamespaceRepo { +pub trait NamespaceRepo: Send + Sync { /// Creates the namespace in the catalog. If one by the same name already exists, an /// error is returned. async fn create( @@ -306,7 +305,7 @@ pub trait NamespaceRepo { /// Functions for working with tables in the catalog #[async_trait] -pub trait TableRepo { +pub trait TableRepo: Send + Sync { /// Creates the table in the catalog or get the existing record by name. async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result; @@ -316,7 +315,7 @@ pub trait TableRepo { /// Functions for working with columns in the catalog #[async_trait] -pub trait ColumnRepo { +pub trait ColumnRepo: Send + Sync { /// Creates the column in the catalog or returns the existing column. Will return a /// `Error::ColumnTypeMismatch` if the existing column type doesn't match the type /// the caller is attempting to create. @@ -333,7 +332,7 @@ pub trait ColumnRepo { /// Functions for working with sequencers in the catalog #[async_trait] -pub trait SequencerRepo { +pub trait SequencerRepo: Send + Sync { /// create a sequencer record for the kafka topic and partition or return the existing record async fn create_or_get( &self, @@ -358,7 +357,7 @@ pub trait SequencerRepo { /// Functions for working with IOx partitions in the catalog. Note that these are how /// IOx splits up data within a database, which is differenet than Kafka partitions. #[async_trait] -pub trait PartitionRepo { +pub trait PartitionRepo: Send + Sync { /// create or get a partition record for the given partition key, sequencer and table async fn create_or_get( &self, @@ -373,7 +372,7 @@ pub trait PartitionRepo { /// Functions for working with tombstones in the catalog #[async_trait] -pub trait TombstoneRepo { +pub trait TombstoneRepo: Send + Sync { /// create or get a tombstone async fn create_or_get( &self, @@ -397,7 +396,7 @@ pub trait TombstoneRepo { /// Functions for working with parquet file pointers in the catalog #[async_trait] -pub trait ParquetFileRepo { +pub trait ParquetFileRepo: Send + Sync { /// create the parquet file #[allow(clippy::too_many_arguments)] async fn create( @@ -519,22 +518,19 @@ impl NamespaceSchema { } /// Gets the namespace schema including all tables and columns. -pub async fn get_schema_by_name( +pub async fn get_schema_by_name( name: &str, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { - let namespace_repo = repo.namespace(); - let table_repo = repo.table(); - let column_repo = repo.column(); - - let namespace = namespace_repo + let namespace = catalog + .namespaces() .get_by_name(name) .await? .context(NamespaceNotFoundSnafu { name })?; // get the columns first just in case someone else is creating schema while we're doing this. - let columns = column_repo.list_by_namespace_id(namespace.id).await?; - let tables = table_repo.list_by_namespace_id(namespace.id).await?; + let columns = catalog.columns().list_by_namespace_id(namespace.id).await?; + let tables = catalog.tables().list_by_namespace_id(namespace.id).await?; let mut namespace = NamespaceSchema::new( namespace.id, @@ -813,25 +809,22 @@ pub struct ParquetFile { pub(crate) mod test_helpers { use super::*; use futures::{stream::FuturesOrdered, StreamExt}; + use std::sync::Arc; - pub(crate) async fn test_repo(new_repo: F) - where - T: RepoCollection + Send + Sync, - F: Fn() -> T + Send + Sync, - { - test_kafka_topic(&new_repo()).await; - test_query_pool(&new_repo()).await; - test_namespace(&new_repo()).await; - test_table(&new_repo()).await; - test_column(&new_repo()).await; - test_sequencer(&new_repo()).await; - test_partition(&new_repo()).await; - test_tombstone(&new_repo()).await; - test_parquet_file(&new_repo()).await; + pub(crate) async fn test_catalog(catalog: Arc) { + test_kafka_topic(Arc::clone(&catalog)).await; + test_query_pool(Arc::clone(&catalog)).await; + test_namespace(Arc::clone(&catalog)).await; + test_table(Arc::clone(&catalog)).await; + test_column(Arc::clone(&catalog)).await; + test_sequencer(Arc::clone(&catalog)).await; + test_partition(Arc::clone(&catalog)).await; + test_tombstone(Arc::clone(&catalog)).await; + test_parquet_file(Arc::clone(&catalog)).await; } - async fn test_kafka_topic(repo: &T) { - let kafka_repo = repo.kafka_topic(); + async fn test_kafka_topic(catalog: Arc) { + let kafka_repo = catalog.kafka_topics(); let k = kafka_repo.create_or_get("foo").await.unwrap(); assert!(k.id > KafkaTopicId::new(0)); assert_eq!(k.name, "foo"); @@ -843,8 +836,8 @@ pub(crate) mod test_helpers { assert!(k3.is_none()); } - async fn test_query_pool(repo: &T) { - let query_repo = repo.query_pool(); + async fn test_query_pool(catalog: Arc) { + let query_repo = catalog.query_pools(); let q = query_repo.create_or_get("foo").await.unwrap(); assert!(q.id > QueryPoolId::new(0)); assert_eq!(q.name, "foo"); @@ -852,10 +845,10 @@ pub(crate) mod test_helpers { assert_eq!(q, q2); } - async fn test_namespace(repo: &T) { - let namespace_repo = repo.namespace(); - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); + async fn test_namespace(catalog: Arc) { + let namespace_repo = catalog.namespaces(); + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); let namespace_name = "test_namespace"; let namespace = namespace_repo @@ -881,53 +874,75 @@ pub(crate) mod test_helpers { assert_eq!(namespace, found); } - async fn test_table(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_table(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_table_test", "inf", kafka.id, pool.id) .await .unwrap(); // test we can create or get a table - let table_repo = repo.table(); - let t = table_repo + let t = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let tt = table_repo + let tt = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); assert!(t.id > TableId::new(0)); assert_eq!(t, tt); - let tables = table_repo.list_by_namespace_id(namespace.id).await.unwrap(); + let tables = catalog + .tables() + .list_by_namespace_id(namespace.id) + .await + .unwrap(); assert_eq!(vec![t], tables); + + // test we can create a table of the same name in a different namespace + let namespace2 = catalog + .namespaces() + .create("two", "inf", kafka.id, pool.id) + .await + .unwrap(); + assert_ne!(namespace, namespace2); + let test_table = catalog + .tables() + .create_or_get("test_table", namespace2.id) + .await + .unwrap(); + assert_ne!(tt, test_table); + assert_eq!(test_table.namespace_id, namespace2.id) } - async fn test_column(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_column(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_column_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); + assert_eq!(table.namespace_id, namespace.id); // test we can create or get a column - let column_repo = repo.column(); - let c = column_repo + let c = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); - let cc = column_repo + let cc = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::Tag) .await .unwrap(); @@ -935,7 +950,8 @@ pub(crate) mod test_helpers { assert_eq!(c, cc); // test that attempting to create an already defined column of a different type returns error - let err = column_repo + let err = catalog + .columns() .create_or_get("column_test", table.id, ColumnType::U64) .await .expect_err("should error with wrong column type"); @@ -949,35 +965,40 @@ pub(crate) mod test_helpers { )); // test that we can create a column of the same name under a different table - let table2 = repo - .table() + let table2 = catalog + .tables() .create_or_get("test_table_2", namespace.id) .await .unwrap(); - let ccc = column_repo + let ccc = catalog + .columns() .create_or_get("column_test", table2.id, ColumnType::U64) .await .unwrap(); assert_ne!(c, ccc); - let columns = column_repo + let columns = catalog + .columns() .list_by_namespace_id(namespace.id) .await .unwrap(); assert_eq!(vec![c, ccc], columns); } - async fn test_sequencer(repo: &T) { - let kafka = repo - .kafka_topic() + async fn test_sequencer(catalog: Arc) { + let kafka = catalog + .kafka_topics() .create_or_get("sequencer_test") .await .unwrap(); - let sequencer_repo = repo.sequencer(); // Create 10 sequencers let created = (1..=10) - .map(|partition| sequencer_repo.create_or_get(&kafka, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -987,7 +1008,8 @@ pub(crate) mod test_helpers { .await; // List them and assert they match - let listed = sequencer_repo + let listed = catalog + .sequencers() .list_by_kafka_topic(&kafka) .await .expect("failed to list sequencers") @@ -999,7 +1021,8 @@ pub(crate) mod test_helpers { // get by the sequencer id and partition let kafka_partition = KafkaPartition::new(1); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, kafka_partition) .await .unwrap() @@ -1007,42 +1030,45 @@ pub(crate) mod test_helpers { assert_eq!(kafka.id, sequencer.kafka_topic_id); assert_eq!(kafka_partition, sequencer.kafka_partition); - let sequencer = sequencer_repo + let sequencer = catalog + .sequencers() .get_by_topic_id_and_partition(kafka.id, KafkaPartition::new(523)) .await .unwrap(); assert!(sequencer.is_none()); } - async fn test_partition(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_partition(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_partition_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let other_sequencer = repo - .sequencer() + let other_sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(2)) .await .unwrap(); - let partition_repo = repo.partition(); - let created = ["foo", "bar"] .iter() - .map(|key| partition_repo.create_or_get(key, sequencer.id, table.id)) + .map(|key| { + catalog + .partitions() + .create_or_get(key, sequencer.id, table.id) + }) .collect::>() .map(|v| { let v = v.expect("failed to create partition"); @@ -1050,13 +1076,15 @@ pub(crate) mod test_helpers { }) .collect::>() .await; - let _ = partition_repo + let _ = catalog + .partitions() .create_or_get("asdf", other_sequencer.id, table.id) .await .unwrap(); // List them and assert they match - let listed = partition_repo + let listed = catalog + .partitions() .list_by_sequencer(sequencer.id) .await .expect("failed to list partitions") @@ -1067,34 +1095,34 @@ pub(crate) mod test_helpers { assert_eq!(created, listed); } - async fn test_tombstone(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_tombstone(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_tombstone_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let tombstone_repo = repo.tombstone(); let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let t1 = tombstone_repo + let t1 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1111,7 +1139,8 @@ pub(crate) mod test_helpers { assert_eq!(t1.min_time, min_time); assert_eq!(t1.max_time, max_time); assert_eq!(t1.serialized_predicate, "whatevs"); - let t2 = tombstone_repo + let t2 = catalog + .tombstones() .create_or_get( other_table.id, sequencer.id, @@ -1122,7 +1151,8 @@ pub(crate) mod test_helpers { ) .await .unwrap(); - let t3 = tombstone_repo + let t3 = catalog + .tombstones() .create_or_get( table.id, sequencer.id, @@ -1134,43 +1164,44 @@ pub(crate) mod test_helpers { .await .unwrap(); - let listed = tombstone_repo + let listed = catalog + .tombstones() .list_tombstones_by_sequencer_greater_than(sequencer.id, SequenceNumber::new(1)) .await .unwrap(); assert_eq!(vec![t2, t3], listed); } - async fn test_parquet_file(repo: &T) { - let kafka = repo.kafka_topic().create_or_get("foo").await.unwrap(); - let pool = repo.query_pool().create_or_get("foo").await.unwrap(); - let namespace = repo - .namespace() + async fn test_parquet_file(catalog: Arc) { + let kafka = catalog.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = catalog.query_pools().create_or_get("foo").await.unwrap(); + let namespace = catalog + .namespaces() .create("namespace_parquet_file_test", "inf", kafka.id, pool.id) .await .unwrap(); - let table = repo - .table() + let table = catalog + .tables() .create_or_get("test_table", namespace.id) .await .unwrap(); - let other_table = repo - .table() + let other_table = catalog + .tables() .create_or_get("other", namespace.id) .await .unwrap(); - let sequencer = repo - .sequencer() + let sequencer = catalog + .sequencers() .create_or_get(&kafka, KafkaPartition::new(1)) .await .unwrap(); - let partition = repo - .partition() + let partition = catalog + .partitions() .create_or_get("one", sequencer.id, table.id) .await .unwrap(); - let other_partition = repo - .partition() + let other_partition = catalog + .partitions() .create_or_get("one", sequencer.id, other_table.id) .await .unwrap(); @@ -1178,7 +1209,7 @@ pub(crate) mod test_helpers { let min_time = Timestamp::new(1); let max_time = Timestamp::new(10); - let parquet_repo = repo.parquet_file(); + let parquet_repo = catalog.parquet_files(); let parquet_file = parquet_repo .create( sequencer.id, diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index a23de38af4..3c5c5f2356 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -12,8 +12,8 @@ )] use crate::interface::{ - column_type_from_field, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, - NamespaceSchema, QueryPool, RepoCollection, Result, Sequencer, SequencerId, TableId, + column_type_from_field, Catalog, ColumnSchema, ColumnType, Error, KafkaPartition, KafkaTopic, + NamespaceSchema, QueryPool, Result, Sequencer, SequencerId, TableId, }; use futures::{stream::FuturesOrdered, StreamExt}; use influxdb_line_protocol::ParsedLine; @@ -36,10 +36,10 @@ pub mod postgres; /// If another writer attempts to create a column of the same name with a different /// type at the same time and beats this caller to it, an error will be returned. If another /// writer adds the same schema before this one, then this will load that schema here. -pub async fn validate_or_insert_schema( +pub async fn validate_or_insert_schema( lines: Vec>, schema: &NamespaceSchema, - repo: &T, + catalog: &dyn Catalog, ) -> Result> { // table name to table_id let mut new_tables: BTreeMap = BTreeMap::new(); @@ -66,8 +66,8 @@ pub async fn validate_or_insert_schema( None => { let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, ColumnType::Tag) .await?; entry.insert( @@ -97,8 +97,8 @@ pub async fn validate_or_insert_schema( let entry = new_columns.entry(table.id).or_default(); if entry.get(key.as_str()).is_none() { let data_type = column_type_from_field(value); - let column_repo = repo.column(); - let column = column_repo + let column = catalog + .columns() .create_or_get(key.as_str(), table.id, data_type) .await?; entry.insert( @@ -113,15 +113,16 @@ pub async fn validate_or_insert_schema( } } None => { - let table_repo = repo.table(); - let new_table = table_repo.create_or_get(table_name, schema.id).await?; + let new_table = catalog + .tables() + .create_or_get(table_name, schema.id) + .await?; let new_table_columns = new_columns.entry(new_table.id).or_default(); - let column_repo = repo.column(); - if let Some(tagset) = &line.series.tag_set { for (key, _) in tagset { - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, ColumnType::Tag) .await?; new_table_columns.insert( @@ -135,7 +136,8 @@ pub async fn validate_or_insert_schema( } for (key, value) in &line.field_set { let data_type = column_type_from_field(value); - let new_column = column_repo + let new_column = catalog + .columns() .create_or_get(key.as_str(), new_table.id, data_type) .await?; new_table_columns.insert( @@ -146,7 +148,8 @@ pub async fn validate_or_insert_schema( }, ); } - let time_column = column_repo + let time_column = catalog + .columns() .create_or_get(TIME_COLUMN, new_table.id, ColumnType::Time) .await?; new_table_columns.insert( @@ -173,19 +176,25 @@ pub async fn validate_or_insert_schema( /// Creates or gets records in the catalog for the shared kafka topic, query pool, and sequencers for /// each of the partitions. -pub async fn create_or_get_default_records( +pub async fn create_or_get_default_records( kafka_partition_count: i32, - repo: &T, + catalog: &dyn Catalog, ) -> Result<(KafkaTopic, QueryPool, BTreeMap)> { - let kafka_repo = repo.kafka_topic(); - let query_repo = repo.query_pool(); - let sequencer_repo = repo.sequencer(); - - let kafka_topic = kafka_repo.create_or_get(SHARED_KAFKA_TOPIC).await?; - let query_pool = query_repo.create_or_get(SHARED_QUERY_POOL).await?; + let kafka_topic = catalog + .kafka_topics() + .create_or_get(SHARED_KAFKA_TOPIC) + .await?; + let query_pool = catalog + .query_pools() + .create_or_get(SHARED_QUERY_POOL) + .await?; let sequencers = (1..=kafka_partition_count) - .map(|partition| sequencer_repo.create_or_get(&kafka_topic, KafkaPartition::new(partition))) + .map(|partition| { + catalog + .sequencers() + .create_or_get(&kafka_topic, KafkaPartition::new(partition)) + }) .collect::>() .map(|v| { let v = v.expect("failed to create sequencer"); @@ -207,13 +216,13 @@ mod tests { #[tokio::test] async fn test_validate_or_insert_schema() { - let repo = Arc::new(MemCatalog::new()); + let repo = MemCatalog::new(); let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &repo).await.unwrap(); let namespace_name = "validate_schema"; // now test with a new namespace let namespace = repo - .namespace() + .namespaces() .create(namespace_name, "inf", kafka_topic.id, query_pool.id) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index c4cf0333b1..b5e634ea81 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -2,16 +2,16 @@ //! used for testing or for an IOx designed to run without catalog persistence. use crate::interface::{ - Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, - KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, - ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, + Catalog, Column, ColumnId, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, + KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, + ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, + QueryPoolRepo, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, Timestamp, Tombstone, TombstoneId, TombstoneRepo, }; use async_trait::async_trait; use std::convert::TryFrom; use std::fmt::Formatter; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use uuid::Uuid; /// In-memory catalog that implements the `RepoCollection` and individual repo traits from @@ -48,41 +48,41 @@ struct MemCollections { parquet_files: Vec, } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for MemCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -180,7 +180,11 @@ impl TableRepo for MemCatalog { async fn create_or_get(&self, name: &str, namespace_id: NamespaceId) -> Result
{ let mut collections = self.collections.lock().expect("mutex poisoned"); - let table = match collections.tables.iter().find(|t| t.name == name) { + let table = match collections + .tables + .iter() + .find(|t| t.name == name && t.namespace_id == namespace_id) + { Some(t) => t, None => { let table = Table { @@ -250,18 +254,22 @@ impl ColumnRepo for MemCatalog { } async fn list_by_namespace_id(&self, namespace_id: NamespaceId) -> Result> { - let mut columns = vec![]; - let collections = self.collections.lock().expect("mutex poisoned"); - for t in collections + + let table_ids: Vec<_> = collections .tables .iter() .filter(|t| t.namespace_id == namespace_id) - { - for c in collections.columns.iter().filter(|c| c.table_id == t.id) { - columns.push(c.clone()); - } - } + .map(|t| t.id) + .collect(); + println!("tables: {:?}", collections.tables); + println!("table_ids: {:?}", table_ids); + let columns: Vec<_> = collections + .columns + .iter() + .filter(|c| table_ids.contains(&c.table_id)) + .cloned() + .collect(); Ok(columns) } @@ -488,11 +496,10 @@ impl ParquetFileRepo for MemCatalog { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; #[tokio::test] - async fn test_mem_repo() { - let f = || Arc::new(MemCatalog::new()); - - crate::interface::test_helpers::test_repo(f).await; + async fn test_catalog() { + crate::interface::test_helpers::test_catalog(Arc::new(MemCatalog::new())).await; } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 2b052a9738..4fc600ea56 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1,16 +1,15 @@ //! A Postgres backed implementation of the Catalog use crate::interface::{ - Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, + Catalog, Column, ColumnRepo, ColumnType, Error, KafkaPartition, KafkaTopic, KafkaTopicId, KafkaTopicRepo, Namespace, NamespaceId, NamespaceRepo, ParquetFile, ParquetFileId, ParquetFileRepo, Partition, PartitionId, PartitionRepo, QueryPool, QueryPoolId, QueryPoolRepo, - RepoCollection, Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, - TableRepo, Timestamp, Tombstone, TombstoneRepo, + Result, SequenceNumber, Sequencer, SequencerId, SequencerRepo, Table, TableId, TableRepo, + Timestamp, Tombstone, TombstoneRepo, }; use async_trait::async_trait; use observability_deps::tracing::info; use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres}; -use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -62,41 +61,41 @@ impl PostgresCatalog { } } -impl RepoCollection for Arc { - fn kafka_topic(&self) -> Arc { - Self::clone(self) as Arc +impl Catalog for PostgresCatalog { + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { + self } - fn query_pool(&self) -> Arc { - Self::clone(self) as Arc + fn query_pools(&self) -> &dyn QueryPoolRepo { + self } - fn namespace(&self) -> Arc { - Self::clone(self) as Arc + fn namespaces(&self) -> &dyn NamespaceRepo { + self } - fn table(&self) -> Arc { - Self::clone(self) as Arc + fn tables(&self) -> &dyn TableRepo { + self } - fn column(&self) -> Arc { - Self::clone(self) as Arc + fn columns(&self) -> &dyn ColumnRepo { + self } - fn sequencer(&self) -> Arc { - Self::clone(self) as Arc + fn sequencers(&self) -> &dyn SequencerRepo { + self } - fn partition(&self) -> Arc { - Self::clone(self) as Arc + fn partitions(&self) -> &dyn PartitionRepo { + self } - fn tombstone(&self) -> Arc { - Self::clone(self) as Arc + fn tombstones(&self) -> &dyn TombstoneRepo { + self } - fn parquet_file(&self) -> Arc { - Self::clone(self) as Arc + fn parquet_files(&self) -> &dyn ParquetFileRepo { + self } } @@ -586,6 +585,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { mod tests { use super::*; use std::env; + use std::sync::Arc; // Helper macro to skip tests if TEST_INTEGRATION and the AWS environment variables are not set. macro_rules! maybe_skip_integration { @@ -624,17 +624,15 @@ mod tests { }}; } - async fn setup_db() -> Arc { + async fn setup_db() -> PostgresCatalog { let dsn = std::env::var("DATABASE_URL").unwrap(); - Arc::new( - PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) - .await - .unwrap(), - ) + PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) + .await + .unwrap() } #[tokio::test] - async fn test_repo() { + async fn test_catalog() { // If running an integration test on your laptop, this requires that you have Postgres // running and that you've done the sqlx migrations. See the README in this crate for // info to set it up. @@ -642,10 +640,9 @@ mod tests { let postgres = setup_db().await; clear_schema(&postgres.pool).await; + let postgres: Arc = Arc::new(postgres); - let f = || Arc::clone(&postgres); - - crate::interface::test_helpers::test_repo(f).await; + crate::interface::test_helpers::test_catalog(postgres).await; } async fn clear_schema(pool: &Pool) {