diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index ed2821c426..398a16210d 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -7,12 +7,31 @@ use crate::interface::{ }; use async_trait::async_trait; use std::convert::TryFrom; +use std::fmt::Formatter; use std::sync::{Arc, Mutex}; -struct MemCatalog { +/// In-memory catalog that implements the `RepoCollection` and individual repo traits fromt +/// the catalog interface. +#[derive(Default)] +pub struct MemCatalog { collections: Mutex, } +impl MemCatalog { + /// return new initialized `MemCatalog` + pub fn new() -> Self { + Self::default() + } +} + +impl std::fmt::Debug for MemCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let c = self.collections.lock().expect("mutex poisoned"); + write!(f, "MemCatalog[ {:?} ]", c) + } +} + +#[derive(Default, Debug)] struct MemCollections { kafka_topics: Vec, query_pools: Vec, diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index dd49e27996..e5bcd20f4e 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -16,41 +16,46 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500); #[allow(dead_code)] const SCHEMA_NAME: &str = "iox_catalog"; -/// Connect to the catalog store. -pub async fn connect_catalog_store( - app_name: &'static str, - schema_name: &'static str, - dsn: &str, -) -> Result, sqlx::Error> { - let pool = PgPoolOptions::new() - .min_connections(1) - .max_connections(MAX_CONNECTIONS) - .connect_timeout(CONNECT_TIMEOUT) - .idle_timeout(IDLE_TIMEOUT) - .test_before_acquire(true) - .after_connect(move |c| { - Box::pin(async move { - // Tag the connection with the provided application name. - c.execute(sqlx::query("SET application_name = '$1';").bind(app_name)) - .await?; - let search_path_query = format!("SET search_path TO {}", schema_name); - c.execute(sqlx::query(&search_path_query)).await?; - - Ok(()) - }) - }) - .connect(dsn) - .await?; - - // Log a connection was successfully established and include the application - // name for cross-correlation between Conductor logs & database connections. - info!(application_name=%app_name, "connected to catalog store"); - - Ok(pool) +/// In-memory catalog that implements the `RepoCollection` and individual repo traits. +#[derive(Debug)] +pub struct PostgresCatalog { + pool: Pool, } -struct PostgresCatalog { - pool: Pool, +impl PostgresCatalog { + /// Connect to the catalog store. + pub async fn connect( + app_name: &'static str, + schema_name: &'static str, + dsn: &str, + ) -> Result { + let pool = PgPoolOptions::new() + .min_connections(1) + .max_connections(MAX_CONNECTIONS) + .connect_timeout(CONNECT_TIMEOUT) + .idle_timeout(IDLE_TIMEOUT) + .test_before_acquire(true) + .after_connect(move |c| { + Box::pin(async move { + // Tag the connection with the provided application name. + c.execute(sqlx::query("SET application_name = '$1';").bind(app_name)) + .await?; + let search_path_query = format!("SET search_path TO {}", schema_name); + c.execute(sqlx::query(&search_path_query)).await?; + + Ok(()) + }) + }) + .connect(dsn) + .await + .map_err(|e| Error::SqlxError { source: e })?; + + // Log a connection was successfully established and include the application + // name for cross-correlation between Conductor logs & database connections. + info!(application_name=%app_name, "connected to catalog store"); + + Ok(Self { pool }) + } } impl RepoCollection for Arc { @@ -348,6 +353,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool { #[cfg(test)] mod tests { use super::*; + use crate::postgres::PostgresCatalog; use crate::{ create_or_get_default_records, interface::NamespaceSchema, validate_or_insert_schema, }; @@ -395,10 +401,11 @@ mod tests { async fn setup_db() -> (Arc, KafkaTopic, QueryPool) { let dsn = std::env::var("DATABASE_URL").unwrap(); - let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn) - .await - .unwrap(); - let postgres_catalog = Arc::new(PostgresCatalog { pool }); + let postgres_catalog = Arc::new( + PostgresCatalog::connect("test", SCHEMA_NAME, &dsn) + .await + .unwrap(), + ); let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog) .await