From 74c251febbf9e2e7ea96e0ac99b352e539d417ad Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 Jan 2022 15:07:38 +0000 Subject: [PATCH] feat: allow IOx catalog to setup itself (no SQLx CLI required) (#3584) * feat: allow IOx catalog to setup itself (no SQLx CLI required) * refactor: use SQLx macro instead of hand-rolled build script --- .circleci/config.yml | 1 - iox_catalog/README.md | 3 +-- iox_catalog/build.rs | 5 +++++ .../20211229171744_initial_schema.sql | 3 --- iox_catalog/src/interface.rs | 17 +++++++++++++++++ iox_catalog/src/mem.rs | 6 ++++++ iox_catalog/src/postgres.rs | 17 +++++++++++++++-- 7 files changed, 44 insertions(+), 8 deletions(-) create mode 100644 iox_catalog/build.rs diff --git a/.circleci/config.yml b/.circleci/config.yml index 575136be05..bf8f856f9d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -237,7 +237,6 @@ jobs: - cache_restore - run: cargo install sqlx-cli - run: sqlx database create - - run: cd iox_catalog && sqlx migrate run && cd .. - run: name: Cargo test command: cargo test --workspace --features=aws,azure,azure_test,kafka diff --git a/iox_catalog/README.md b/iox_catalog/README.md index d01cc15250..d53cda9e25 100644 --- a/iox_catalog/README.md +++ b/iox_catalog/README.md @@ -11,12 +11,11 @@ user and password filled in: DATABASE_URL=postgres://:@localhost/iox_shared ``` -You'll then need to create the database and run the migrations. You can do this via the sqlx command line. +You'll then need to create the database. You can do this via the sqlx command line. ``` cargo install sqlx-cli sqlx database create -sqlx migrate run ``` This will set up the database based on the files in `./migrations` in this crate. SQLx also creates a table diff --git a/iox_catalog/build.rs b/iox_catalog/build.rs new file mode 100644 index 0000000000..d5068697c7 --- /dev/null +++ b/iox_catalog/build.rs @@ -0,0 +1,5 @@ +// generated by `sqlx migrate build-script` +fn main() { + // trigger recompilation when a new migration is added + println!("cargo:rerun-if-changed=migrations"); +} diff --git a/iox_catalog/migrations/20211229171744_initial_schema.sql b/iox_catalog/migrations/20211229171744_initial_schema.sql index 6c8606ec73..bf812da6e1 100644 --- a/iox_catalog/migrations/20211229171744_initial_schema.sql +++ b/iox_catalog/migrations/20211229171744_initial_schema.sql @@ -1,7 +1,5 @@ -- Add migration script here -- iox_shared schema -BEGIN; - CREATE SCHEMA IF NOT EXISTS iox_catalog; CREATE TABLE IF NOT EXISTS iox_catalog.kafka_topic @@ -233,4 +231,3 @@ ALTER TABLE IF EXISTS iox_catalog.processed_tombstone ON UPDATE NO ACTION ON DELETE NO ACTION NOT VALID; -END; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 44f8ec97f1..31d4114c1f 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -256,22 +256,33 @@ impl std::fmt::Display for ParquetFileId { /// Trait that contains methods for working with the catalog #[async_trait] pub trait Catalog: Send + Sync + Debug { + /// Setup catalog for usage and apply possible migrations. + async fn setup(&self) -> Result<(), Error>; + /// repo for kafka topics fn kafka_topics(&self) -> &dyn KafkaTopicRepo; + /// repo fo rquery pools fn query_pools(&self) -> &dyn QueryPoolRepo; + /// repo for namespaces fn namespaces(&self) -> &dyn NamespaceRepo; + /// repo for tables fn tables(&self) -> &dyn TableRepo; + /// repo for columns fn columns(&self) -> &dyn ColumnRepo; + /// repo for sequencers fn sequencers(&self) -> &dyn SequencerRepo; + /// repo for partitions fn partitions(&self) -> &dyn PartitionRepo; + /// repo for tombstones fn tombstones(&self) -> &dyn TombstoneRepo; + /// repo for parquet_files fn parquet_files(&self) -> &dyn ParquetFileRepo; } @@ -839,6 +850,7 @@ pub(crate) mod test_helpers { use std::sync::Arc; pub(crate) async fn test_catalog(catalog: Arc) { + test_setup(Arc::clone(&catalog)).await; test_kafka_topic(Arc::clone(&catalog)).await; test_query_pool(Arc::clone(&catalog)).await; test_namespace(Arc::clone(&catalog)).await; @@ -850,6 +862,11 @@ pub(crate) mod test_helpers { test_parquet_file(Arc::clone(&catalog)).await; } + async fn test_setup(catalog: Arc) { + catalog.setup().await.expect("first catalog setup"); + catalog.setup().await.expect("second catalog setup"); + } + async fn test_kafka_topic(catalog: Arc) { let kafka_repo = catalog.kafka_topics(); let k = kafka_repo.create_or_get("foo").await.unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index b5e634ea81..9c9f75b2e1 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -48,7 +48,13 @@ struct MemCollections { parquet_files: Vec, } +#[async_trait] impl Catalog for MemCatalog { + async fn setup(&self) -> Result<(), Error> { + // nothing to do + Ok(()) + } + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { self } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index d790500427..862f4addac 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -9,7 +9,7 @@ use crate::interface::{ }; use async_trait::async_trait; use observability_deps::tracing::info; -use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres}; +use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Executor, Pool, Postgres}; use std::time::Duration; use uuid::Uuid; @@ -19,6 +19,8 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500); /// the default schema name to use in Postgres pub const SCHEMA_NAME: &str = "iox_catalog"; +static MIGRATOR: Migrator = sqlx::migrate!(); + /// In-memory catalog that implements the `RepoCollection` and individual repo traits. #[derive(Debug)] pub struct PostgresCatalog { @@ -43,7 +45,7 @@ impl PostgresCatalog { // Tag the connection with the provided application name. c.execute(sqlx::query("SET application_name = '$1';").bind(app_name)) .await?; - let search_path_query = format!("SET search_path TO {}", schema_name); + let search_path_query = format!("SET search_path TO public,{}", schema_name); c.execute(sqlx::query(&search_path_query)).await?; Ok(()) @@ -61,7 +63,17 @@ impl PostgresCatalog { } } +#[async_trait] impl Catalog for PostgresCatalog { + async fn setup(&self) -> Result<(), Error> { + MIGRATOR + .run(&self.pool) + .await + .map_err(|e| Error::SqlxError { source: e.into() })?; + + Ok(()) + } + fn kafka_topics(&self) -> &dyn KafkaTopicRepo { self } @@ -639,6 +651,7 @@ mod tests { maybe_skip_integration!(); let postgres = setup_db().await; + postgres.setup().await.unwrap(); clear_schema(&postgres.pool).await; let postgres: Arc = Arc::new(postgres);