feat: allow IOx catalog to setup itself (no SQLx CLI required) (#3584)

* feat: allow IOx catalog to setup itself (no SQLx CLI required)

* refactor: use SQLx macro instead of hand-rolled build script
pull/24376/head
Marco Neumann 2022-01-31 15:07:38 +00:00 committed by GitHub
parent c50fc8764d
commit 74c251febb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 44 additions and 8 deletions

View File

@ -237,7 +237,6 @@ jobs:
- cache_restore
- run: cargo install sqlx-cli
- run: sqlx database create
- run: cd iox_catalog && sqlx migrate run && cd ..
- run:
name: Cargo test
command: cargo test --workspace --features=aws,azure,azure_test,kafka

View File

@ -11,12 +11,11 @@ user and password filled in:
DATABASE_URL=postgres://<postgres user>:<postgres password>@localhost/iox_shared
```
You'll then need to create the database and run the migrations. You can do this via the sqlx command line.
You'll then need to create the database. You can do this via the sqlx command line.
```
cargo install sqlx-cli
sqlx database create
sqlx migrate run
```
This will set up the database based on the files in `./migrations` in this crate. SQLx also creates a table

5
iox_catalog/build.rs Normal file
View File

@ -0,0 +1,5 @@
// generated by `sqlx migrate build-script`
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}

View File

@ -1,7 +1,5 @@
-- Add migration script here
-- iox_shared schema
BEGIN;
CREATE SCHEMA IF NOT EXISTS iox_catalog;
CREATE TABLE IF NOT EXISTS iox_catalog.kafka_topic
@ -233,4 +231,3 @@ ALTER TABLE IF EXISTS iox_catalog.processed_tombstone
ON UPDATE NO ACTION
ON DELETE NO ACTION
NOT VALID;
END;

View File

@ -256,22 +256,33 @@ impl std::fmt::Display for ParquetFileId {
/// Trait that contains methods for working with the catalog
#[async_trait]
pub trait Catalog: Send + Sync + Debug {
/// Setup catalog for usage and apply possible migrations.
async fn setup(&self) -> Result<(), Error>;
/// repo for kafka topics
fn kafka_topics(&self) -> &dyn KafkaTopicRepo;
/// repo fo rquery pools
fn query_pools(&self) -> &dyn QueryPoolRepo;
/// repo for namespaces
fn namespaces(&self) -> &dyn NamespaceRepo;
/// repo for tables
fn tables(&self) -> &dyn TableRepo;
/// repo for columns
fn columns(&self) -> &dyn ColumnRepo;
/// repo for sequencers
fn sequencers(&self) -> &dyn SequencerRepo;
/// repo for partitions
fn partitions(&self) -> &dyn PartitionRepo;
/// repo for tombstones
fn tombstones(&self) -> &dyn TombstoneRepo;
/// repo for parquet_files
fn parquet_files(&self) -> &dyn ParquetFileRepo;
}
@ -839,6 +850,7 @@ pub(crate) mod test_helpers {
use std::sync::Arc;
pub(crate) async fn test_catalog(catalog: Arc<dyn Catalog>) {
test_setup(Arc::clone(&catalog)).await;
test_kafka_topic(Arc::clone(&catalog)).await;
test_query_pool(Arc::clone(&catalog)).await;
test_namespace(Arc::clone(&catalog)).await;
@ -850,6 +862,11 @@ pub(crate) mod test_helpers {
test_parquet_file(Arc::clone(&catalog)).await;
}
async fn test_setup(catalog: Arc<dyn Catalog>) {
catalog.setup().await.expect("first catalog setup");
catalog.setup().await.expect("second catalog setup");
}
async fn test_kafka_topic(catalog: Arc<dyn Catalog>) {
let kafka_repo = catalog.kafka_topics();
let k = kafka_repo.create_or_get("foo").await.unwrap();

View File

@ -48,7 +48,13 @@ struct MemCollections {
parquet_files: Vec<ParquetFile>,
}
#[async_trait]
impl Catalog for MemCatalog {
async fn setup(&self) -> Result<(), Error> {
// nothing to do
Ok(())
}
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}

View File

@ -9,7 +9,7 @@ use crate::interface::{
};
use async_trait::async_trait;
use observability_deps::tracing::info;
use sqlx::{postgres::PgPoolOptions, Executor, Pool, Postgres};
use sqlx::{migrate::Migrator, postgres::PgPoolOptions, Executor, Pool, Postgres};
use std::time::Duration;
use uuid::Uuid;
@ -19,6 +19,8 @@ const IDLE_TIMEOUT: Duration = Duration::from_secs(500);
/// the default schema name to use in Postgres
pub const SCHEMA_NAME: &str = "iox_catalog";
static MIGRATOR: Migrator = sqlx::migrate!();
/// In-memory catalog that implements the `RepoCollection` and individual repo traits.
#[derive(Debug)]
pub struct PostgresCatalog {
@ -43,7 +45,7 @@ impl PostgresCatalog {
// Tag the connection with the provided application name.
c.execute(sqlx::query("SET application_name = '$1';").bind(app_name))
.await?;
let search_path_query = format!("SET search_path TO {}", schema_name);
let search_path_query = format!("SET search_path TO public,{}", schema_name);
c.execute(sqlx::query(&search_path_query)).await?;
Ok(())
@ -61,7 +63,17 @@ impl PostgresCatalog {
}
}
#[async_trait]
impl Catalog for PostgresCatalog {
async fn setup(&self) -> Result<(), Error> {
MIGRATOR
.run(&self.pool)
.await
.map_err(|e| Error::SqlxError { source: e.into() })?;
Ok(())
}
fn kafka_topics(&self) -> &dyn KafkaTopicRepo {
self
}
@ -639,6 +651,7 @@ mod tests {
maybe_skip_integration!();
let postgres = setup_db().await;
postgres.setup().await.unwrap();
clear_schema(&postgres.pool).await;
let postgres: Arc<dyn Catalog> = Arc::new(postgres);