refactor: make postgres and mem catalog implementations public

pull/24376/head
Paul Dix 2022-01-17 14:46:10 -05:00
parent b3ee1032b3
commit dfe95e1a56
2 changed files with 64 additions and 38 deletions

View File

@ -7,12 +7,31 @@ use crate::interface::{
};
use async_trait::async_trait;
use std::convert::TryFrom;
use std::fmt::Formatter;
use std::sync::{Arc, Mutex};
struct MemCatalog {
/// In-memory catalog that implements the `RepoCollection` and individual repo traits fromt
/// the catalog interface.
#[derive(Default)]
pub struct MemCatalog {
collections: Mutex<MemCollections>,
}
impl MemCatalog {
/// return new initialized `MemCatalog`
pub fn new() -> Self {
Self::default()
}
}
impl std::fmt::Debug for MemCatalog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let c = self.collections.lock().expect("mutex poisoned");
write!(f, "MemCatalog[ {:?} ]", c)
}
}
#[derive(Default, Debug)]
struct MemCollections {
kafka_topics: Vec<KafkaTopic>,
query_pools: Vec<QueryPool>,

View File

@ -16,41 +16,46 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
#[allow(dead_code)]
const SCHEMA_NAME: &str = "iox_catalog";
/// Connect to the catalog store.
pub async fn connect_catalog_store(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Pool<Postgres>, sqlx::Error> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(pool)
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
#[derive(Debug)]
pub struct PostgresCatalog {
pool: Pool<Postgres>,
}
struct PostgresCatalog {
pool: Pool<Postgres>,
impl PostgresCatalog {
/// Connect to the catalog store.
pub async fn connect(
app_name: &'static str,
schema_name: &'static str,
dsn: &str,
) -> Result<Self> {
let pool = PgPoolOptions::new()
.min_connections(1)
.max_connections(MAX_CONNECTIONS)
.connect_timeout(CONNECT_TIMEOUT)
.idle_timeout(IDLE_TIMEOUT)
.test_before_acquire(true)
.after_connect(move |c| {
Box::pin(async move {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
})
})
.connect(dsn)
.await
.map_err(|e| Error::SqlxError { source: e })?;
// Log a connection was successfully established and include the application
// name for cross-correlation between Conductor logs & database connections.
info!(application_name=%app_name, "connected to catalog store");
Ok(Self { pool })
}
}
impl RepoCollection for Arc<PostgresCatalog> {
@ -348,6 +353,7 @@ fn is_fk_violation(e: &sqlx::Error) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::postgres::PostgresCatalog;
use crate::{
create_or_get_default_records, interface::NamespaceSchema, validate_or_insert_schema,
};
@ -395,10 +401,11 @@ mod tests {
async fn setup_db() -> (Arc<PostgresCatalog>, KafkaTopic, QueryPool) {
let dsn = std::env::var("DATABASE_URL").unwrap();
let pool = connect_catalog_store("test", SCHEMA_NAME, &dsn)
.await
.unwrap();
let postgres_catalog = Arc::new(PostgresCatalog { pool });
let postgres_catalog = Arc::new(
PostgresCatalog::connect("test", SCHEMA_NAME, &dsn)
.await
.unwrap(),
);
let (kafka_topic, query_pool, _) = create_or_get_default_records(2, &postgres_catalog)
.await