diff --git a/parquet_file/src/catalog/cleanup.rs b/parquet_file/src/catalog/cleanup.rs index a43eb00513..6e04eafe1a 100644 --- a/parquet_file/src/catalog/cleanup.rs +++ b/parquet_file/src/catalog/cleanup.rs @@ -54,18 +54,19 @@ pub type Result = std::result::Result; /// The exclusive access can be dropped after this method returned and before calling /// [`delete_files`]. pub async fn get_unreferenced_parquet_files( - db_name: &str, catalog: &PreservedCatalog, max_files: usize, ) -> Result> { let iox_object_store = catalog.iox_object_store(); let all_known = { // replay catalog transactions to track ALL (even dropped) files that are referenced - let (_catalog, state) = - PreservedCatalog::load::(db_name, catalog.config(), ()) - .await - .context(CatalogLoadError)? - .expect("catalog gone while reading it?"); + let (_catalog, state) = PreservedCatalog::load::( + catalog.config(), + TracerCatalogState::default(), + ) + .await + .context(CatalogLoadError)? + .expect("catalog gone while reading it?"); state.files.into_inner() }; @@ -120,19 +121,12 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[ParquetFilePath]) } /// Catalog state that traces all used parquet files. +#[derive(Debug, Default)] struct TracerCatalogState { files: Mutex>, } impl CatalogState for TracerCatalogState { - type EmptyInput = (); - - fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self { - Self { - files: Default::default(), - } - } - fn add( &mut self, _iox_object_store: Arc, @@ -161,8 +155,8 @@ impl CatalogState for TracerCatalogState { mod tests { use super::*; use crate::{ - catalog::test_helpers::{new_empty, DB_NAME}, - test_utils::{chunk_addr, make_config, make_metadata, TestSize}, + catalog::test_helpers::{make_config, new_empty}, + test_utils::{chunk_addr, make_metadata, TestSize}, }; use std::{collections::HashSet, sync::Arc}; use tokio::sync::RwLock; @@ -171,10 +165,10 @@ mod tests { async fn test_cleanup_empty() { let config = make_config().await; - let (catalog, _state) = new_empty(config).await; + let catalog = new_empty(config).await; // run clean-up - let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000) + let files = get_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); delete_files(&catalog, &files).await.unwrap(); @@ -185,7 +179,7 @@ mod tests { let config = make_config().await; let iox_object_store = &config.iox_object_store; - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; // create some data let mut paths_keep = vec![]; @@ -229,7 +223,7 @@ mod tests { } // run clean-up - let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000) + let files = get_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); delete_files(&catalog, &files).await.unwrap(); @@ -253,7 +247,7 @@ mod tests { let iox_object_store = &config.iox_object_store; let lock: RwLock<()> = Default::default(); - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; // try multiple times to provoke a conflict for i in 0..100 { @@ -287,7 +281,7 @@ mod tests { }, async { let guard = lock.write().await; - let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000) + let files = get_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); drop(guard); @@ -306,7 +300,7 @@ mod tests { let config = make_config().await; let iox_object_store = &config.iox_object_store; - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; // create some files let mut to_remove = HashSet::default(); @@ -322,9 +316,7 @@ mod tests { } // run clean-up - let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2) - .await - .unwrap(); + let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap(); assert_eq!(files.len(), 2); delete_files(&catalog, &files).await.unwrap(); @@ -334,9 +326,7 @@ mod tests { assert_eq!(leftover.len(), 1); // run clean-up again - let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2) - .await - .unwrap(); + let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap(); assert_eq!(files.len(), 1); delete_files(&catalog, &files).await.unwrap(); diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index ccb1159488..db1a6adad3 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -169,6 +169,9 @@ pub struct PreservedCatalogConfig { /// Object store that backs the catalog pub(crate) iox_object_store: Arc, + /// Database name + pub(crate) db_name: String, + /// Fixed UUID for testing pub(crate) fixed_uuid: Option, @@ -179,10 +182,12 @@ pub struct PreservedCatalogConfig { impl PreservedCatalogConfig { pub fn new( iox_object_store: Arc, + db_name: String, time_provider: Arc, ) -> Self { Self { iox_object_store, + db_name, fixed_uuid: None, time_provider, } @@ -231,6 +236,9 @@ pub struct PreservedCatalog { /// Object store that backs this catalog. iox_object_store: Arc, + /// Database name + db_name: String, + /// If set, this UUID will be used for all transactions instead of a fresh UUIDv4. /// /// This can be useful for testing to achieve deterministic outputs. @@ -304,23 +312,16 @@ impl PreservedCatalog { /// /// An empty transaction will be used to mark the catalog start so that concurrent open but /// still-empty catalogs can easily be detected. - pub async fn new_empty( - db_name: &str, - config: PreservedCatalogConfig, - state_data: S::EmptyInput, - ) -> Result<(Self, S)> - where - S: CatalogState + Send + Sync, - { + pub async fn new_empty(config: PreservedCatalogConfig) -> Result { if Self::exists(&config.iox_object_store).await? { return Err(Error::AlreadyExists {}); } - let state = S::new_empty(db_name, state_data); let catalog = Self { previous_tkey: RwLock::new(None), transaction_semaphore: Semaphore::new(1), iox_object_store: config.iox_object_store, + db_name: config.db_name, fixed_uuid: config.fixed_uuid, time_provider: config.time_provider, }; @@ -333,18 +334,14 @@ impl PreservedCatalog { .map_err(Box::new) .context(CommitError)?; - Ok((catalog, state)) + Ok(catalog) } /// Load existing catalog from store, if it exists. /// /// Loading starts at the latest checkpoint or -- if none exists -- at transaction `0`. /// Transactions before that point are neither verified nor are they required to exist. - pub async fn load( - db_name: &str, - config: PreservedCatalogConfig, - state_data: S::EmptyInput, - ) -> Result> + pub async fn load(config: PreservedCatalogConfig, mut state: S) -> Result> where S: CatalogState + Send + Sync, { @@ -408,10 +405,6 @@ impl PreservedCatalog { return Ok(None); } - // setup empty state - let mut state = S::new_empty(db_name, state_data); - let mut last_tkey = None; - // detect replay start let start_revision = last_checkpoint.unwrap_or(0); @@ -419,6 +412,7 @@ impl PreservedCatalog { let max_revision = max_revision.expect("transactions list is not empty here"); // read and replay delta revisions + let mut last_tkey = None; for rev in start_revision..=max_revision { let uuid = transactions.get(&rev).context(MissingTransaction { revision_counter: rev, @@ -448,6 +442,7 @@ impl PreservedCatalog { previous_tkey: RwLock::new(last_tkey), transaction_semaphore: Semaphore::new(1), iox_object_store: config.iox_object_store, + db_name: config.db_name, fixed_uuid: config.fixed_uuid, time_provider: config.time_provider, }, @@ -486,6 +481,7 @@ impl PreservedCatalog { pub fn config(&self) -> PreservedCatalogConfig { PreservedCatalogConfig { iox_object_store: Arc::clone(&self.iox_object_store), + db_name: self.db_name.clone(), fixed_uuid: self.fixed_uuid, time_provider: Arc::clone(&self.time_provider), } @@ -1074,11 +1070,9 @@ mod tests { use super::*; use crate::catalog::test_helpers::{ break_catalog_with_weird_version, create_delete_predicate, exists, load_err, load_ok, - new_empty, TestCatalogState, DB_NAME, - }; - use crate::test_utils::{ - chunk_addr, make_config, make_iox_object_store, make_metadata, TestSize, + make_config, new_empty, TestCatalogState, }; + use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize}; #[tokio::test] async fn test_create_empty() { @@ -1364,7 +1358,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_debug() { let config = make_config().await; - let (catalog, _state) = new_empty(config).await; + let catalog = new_empty(config).await; let mut t = catalog.open_transaction().await; // open transaction @@ -1722,7 +1716,8 @@ mod tests { let config = make_config().await; let iox_object_store = &config.iox_object_store; - let (catalog, mut state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; + let mut state = TestCatalogState::default(); { let mut t = catalog.open_transaction().await; @@ -1879,7 +1874,8 @@ mod tests { async fn assert_single_catalog_inmem_works(config: PreservedCatalogConfig) -> TestTrace { let iox_object_store = &config.iox_object_store; - let (catalog, mut state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; + let mut state = TestCatalogState::default(); // track all the intermediate results let mut trace = TestTrace::new(); @@ -2002,7 +1998,7 @@ mod tests { new_empty(config.clone()).await; - let res = PreservedCatalog::new_empty::(DB_NAME, config, ()).await; + let res = PreservedCatalog::new_empty(config).await; assert_eq!(res.unwrap_err().to_string(), "Catalog already exists"); } @@ -2056,7 +2052,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_revision_counter() { let config = make_config().await; - let (catalog, _state) = new_empty(config).await; + let catalog = new_empty(config).await; let t = catalog.open_transaction().await; assert_eq!(t.revision_counter(), 1); @@ -2065,7 +2061,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_uuid() { let config = make_config().await; - let (catalog, _state) = new_empty(config).await; + let catalog = new_empty(config).await; let mut t = catalog.open_transaction().await; t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into(); @@ -2272,7 +2268,8 @@ mod tests { assert!(!exists(iox_object_store).await); - let (catalog, state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; + let state = TestCatalogState::default(); // delete transaction file let tkey = catalog.previous_tkey.read().unwrap(); diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index cbdab846a5..4790fc91e9 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -221,11 +221,9 @@ mod tests { use super::*; use crate::{ catalog::{ - core::PreservedCatalog, - interface::CatalogParquetInfo, - test_helpers::{TestCatalogState, DB_NAME}, + core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config, }, - test_utils::{chunk_addr, make_config, make_metadata, TestSize}, + test_utils::{chunk_addr, make_metadata, TestSize}, }; use time::Time; use uuid::Uuid; @@ -241,10 +239,7 @@ mod tests { let iox_object_store = &config.iox_object_store; // build catalog with some data - let (catalog, _state) = - PreservedCatalog::new_empty::(DB_NAME, config.clone(), ()) - .await - .unwrap(); + let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); { let mut transaction = catalog.open_transaction().await; @@ -361,10 +356,7 @@ File { let iox_object_store = &config.iox_object_store; // build catalog with some data - let (catalog, _state) = - PreservedCatalog::new_empty::(DB_NAME, config.clone(), ()) - .await - .unwrap(); + let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap(); { let mut transaction = catalog.open_transaction().await; diff --git a/parquet_file/src/catalog/interface.rs b/parquet_file/src/catalog/interface.rs index 1c0cb0c52f..8910f3a5e7 100644 --- a/parquet_file/src/catalog/interface.rs +++ b/parquet_file/src/catalog/interface.rs @@ -98,14 +98,6 @@ pub enum CatalogStateRemoveError { /// Abstraction over how the in-memory state of the catalog works. pub trait CatalogState { - /// Input to create a new empty instance. - /// - /// See [`new_empty`](Self::new_empty) for details. - type EmptyInput: Send; - - /// Create empty state w/o any known files. - fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self; - /// Add parquet file to state. fn add( &mut self, diff --git a/parquet_file/src/catalog/prune.rs b/parquet_file/src/catalog/prune.rs index c9c002f28f..b4b7efb408 100644 --- a/parquet_file/src/catalog/prune.rs +++ b/parquet_file/src/catalog/prune.rs @@ -125,9 +125,9 @@ mod tests { catalog::{ core::PreservedCatalog, interface::CheckpointData, - test_helpers::{load_ok, new_empty}, + test_helpers::{load_ok, make_config, new_empty}, }, - test_utils::{make_config, make_iox_object_store}, + test_utils::make_iox_object_store, }; use super::*; @@ -166,7 +166,7 @@ mod tests { let iox_object_store = &config.iox_object_store; - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; create_transaction(&catalog).await; create_transaction_and_checkpoint(&catalog).await; @@ -194,7 +194,7 @@ mod tests { let iox_object_store = &config.iox_object_store; - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; create_transaction(&catalog).await; create_transaction_and_checkpoint(&catalog).await; @@ -229,7 +229,7 @@ mod tests { let config = make_config().await; let iox_object_store = &config.iox_object_store; - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; create_transaction(&catalog).await; create_transaction_and_checkpoint(&catalog).await; create_transaction(&catalog).await; diff --git a/parquet_file/src/catalog/rebuild.rs b/parquet_file/src/catalog/rebuild.rs index cfa7ffee1d..b8a0f2299d 100644 --- a/parquet_file/src/catalog/rebuild.rs +++ b/parquet_file/src/catalog/rebuild.rs @@ -69,9 +69,8 @@ pub type Result = std::result::Result; /// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set /// `ignore_metadata_read_failure` to `true` to ignore these cases. pub async fn rebuild_catalog( - db_name: &str, config: PreservedCatalogConfig, - catalog_empty_input: S::EmptyInput, + mut state: S, ignore_metadata_read_failure: bool, ) -> Result<(PreservedCatalog, S)> where @@ -81,10 +80,9 @@ where let files = collect_files(&config.iox_object_store, ignore_metadata_read_failure).await?; // create new empty catalog - let (catalog, mut state) = - PreservedCatalog::new_empty::(db_name, config.clone(), catalog_empty_input) - .await - .context(NewEmptyFailure)?; + let catalog = PreservedCatalog::new_empty(config.clone()) + .await + .context(NewEmptyFailure)?; // create single transaction with all files if !files.is_empty() { @@ -174,13 +172,11 @@ mod tests { use crate::{ catalog::{ core::PreservedCatalog, - test_helpers::{exists, new_empty, TestCatalogState, DB_NAME}, + test_helpers::{exists, make_config, new_empty, TestCatalogState}, }, metadata::IoxMetadata, storage::{MemWriter, Storage}, - test_utils::{ - create_partition_and_database_checkpoint, make_config, make_record_batch, TestSize, - }, + test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize}, }; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}; use datafusion::physical_plan::SendableRecordBatchStream; @@ -196,7 +192,8 @@ mod tests { let db_name = Arc::from("db1"); // build catalog with some data - let (catalog, mut state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; + let mut state = TestCatalogState::default(); { let mut transaction = catalog.open_transaction().await; @@ -237,7 +234,7 @@ mod tests { PreservedCatalog::wipe(iox_object_store).await.unwrap(); // rebuild - let (catalog, state) = rebuild_catalog::(DB_NAME, config, (), false) + let (catalog, state) = rebuild_catalog(config, TestCatalogState::default(), false) .await .unwrap(); @@ -256,7 +253,7 @@ mod tests { let config = make_config().await; // build empty catalog - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; // wipe catalog drop(catalog); @@ -265,7 +262,7 @@ mod tests { .unwrap(); // rebuild - let (catalog, state) = rebuild_catalog::(DB_NAME, config, (), false) + let (catalog, state) = rebuild_catalog(config, TestCatalogState::default(), false) .await .unwrap(); @@ -293,15 +290,14 @@ mod tests { PreservedCatalog::wipe(iox_object_store).await.unwrap(); // rebuild (do not ignore errors) - let res = rebuild_catalog::(DB_NAME, config.clone(), (), false).await; + let res = rebuild_catalog(config.clone(), TestCatalogState::default(), false).await; assert!(dbg!(res.unwrap_err().to_string()) .starts_with("Cannot read IOx metadata from parquet file")); // rebuild (ignore errors) - let (catalog, state) = - rebuild_catalog::(DB_NAME, config.clone(), (), true) - .await - .unwrap(); + let (catalog, state) = rebuild_catalog(config.clone(), TestCatalogState::default(), true) + .await + .unwrap(); assert!(state.files().next().is_none()); assert_eq!(catalog.revision_counter(), 0); } @@ -320,7 +316,7 @@ mod tests { let iox_object_store = &config.iox_object_store; // build catalog with some data (2 transactions + initial empty one) - let (catalog, _state) = new_empty(config.clone()).await; + let catalog = new_empty(config.clone()).await; assert_eq!(catalog.revision_counter(), 0); // wipe catalog @@ -328,7 +324,7 @@ mod tests { PreservedCatalog::wipe(iox_object_store).await.unwrap(); // rebuild - let catalog = rebuild_catalog::(DB_NAME, config.clone(), (), false) + let catalog = rebuild_catalog(config.clone(), TestCatalogState::default(), false) .await .unwrap(); drop(catalog); diff --git a/parquet_file/src/catalog/test_helpers.rs b/parquet_file/src/catalog/test_helpers.rs index e53c04740b..6008e6cb95 100644 --- a/parquet_file/src/catalog/test_helpers.rs +++ b/parquet_file/src/catalog/test_helpers.rs @@ -1,7 +1,6 @@ -use crate::catalog::core::PreservedCatalogConfig; use crate::{ catalog::{ - core::PreservedCatalog, + core::{PreservedCatalog, PreservedCatalogConfig}, interface::{ CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError, CheckpointData, ChunkAddrWithoutDatabase, @@ -12,7 +11,7 @@ use crate::{ }, }, metadata::IoxParquetMetaData, - test_utils::{chunk_addr, make_config, make_metadata, TestSize}, + test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize}, }; use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange}; use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath}; @@ -146,14 +145,6 @@ impl TestCatalogState { } impl CatalogState for TestCatalogState { - type EmptyInput = (); - - fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self { - Self { - tables: HashMap::new(), - } - } - fn add( &mut self, _iox_object_store: Arc, @@ -222,21 +213,21 @@ pub async fn exists(iox_object_store: &Arc) -> bool { pub async fn load_ok( config: PreservedCatalogConfig, ) -> Option<(PreservedCatalog, TestCatalogState)> { - PreservedCatalog::load(DB_NAME, config, ()).await.unwrap() + PreservedCatalog::load(config, TestCatalogState::default()) + .await + .unwrap() } /// Load a `PreservedCatalog` and unwrap the error, expecting the operation to fail pub async fn load_err(config: PreservedCatalogConfig) -> crate::catalog::core::Error { - PreservedCatalog::load::(DB_NAME, config, ()) + PreservedCatalog::load(config, TestCatalogState::default()) .await .unwrap_err() } /// Create a new empty catalog with the TestCatalogState, expecting the operation to succeed -pub async fn new_empty(config: PreservedCatalogConfig) -> (PreservedCatalog, TestCatalogState) { - PreservedCatalog::new_empty(DB_NAME, config, ()) - .await - .unwrap() +pub async fn new_empty(config: PreservedCatalogConfig) -> PreservedCatalog { + PreservedCatalog::new_empty(config).await.unwrap() } /// Break preserved catalog by moving one of the transaction files into a weird unknown version. @@ -265,17 +256,12 @@ fn get_tkey(catalog: &PreservedCatalog) -> TransactionKey { /// Torture-test implementations for [`CatalogState`]. /// /// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided. -pub async fn assert_catalog_state_implementation(state_data: S::EmptyInput, f: F) +pub async fn assert_catalog_state_implementation(mut state: S, f: F) where S: CatalogState + Debug + Send + Sync, F: Fn(&S) -> CheckpointData + Send, { - // empty state let config = make_config().await; - let (_catalog, mut state) = - PreservedCatalog::new_empty::(DB_NAME, config.clone(), state_data) - .await - .unwrap(); // The expected state of the catalog let mut expected_files: HashMap)> = @@ -675,14 +661,21 @@ pub fn create_delete_predicate(value: i64) -> Arc { }) } +/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store +pub async fn make_config() -> PreservedCatalogConfig { + let iox_object_store = make_iox_object_store().await; + let time_provider = Arc::new(time::SystemProvider::new()); + PreservedCatalogConfig::new(iox_object_store, DB_NAME.to_string(), time_provider) +} + #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_catalog_state() { - assert_catalog_state_implementation::( - (), + assert_catalog_state_implementation( + TestCatalogState::default(), TestCatalogState::checkpoint_data, ) .await; diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 74596aa6d2..7994637558 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -1,4 +1,3 @@ -use crate::catalog::core::PreservedCatalogConfig; use crate::{ chunk::{self, ChunkMetrics, ParquetChunk}, metadata::{IoxMetadata, IoxParquetMetaData}, @@ -868,13 +867,6 @@ pub async fn make_iox_object_store() -> Arc { ) } -/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store -pub async fn make_config() -> PreservedCatalogConfig { - let iox_object_store = make_iox_object_store().await; - let time_provider = Arc::new(time::SystemProvider::new()); - PreservedCatalogConfig::new(iox_object_store, time_provider) -} - pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec) -> Vec { let mut record_batches = vec![]; diff --git a/server/src/db.rs b/server/src/db.rs index b6235efc2f..f808e7d4c9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -919,8 +919,7 @@ impl Db { self: &Arc, ) -> std::result::Result<(), parquet_file::catalog::cleanup::Error> { let guard = self.cleanup_lock.write().await; - let files = - get_unreferenced_parquet_files(&self.name(), &self.preserved_catalog, 1_000).await?; + let files = get_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await?; drop(guard); delete_parquet_files(&self.preserved_catalog, &files).await diff --git a/server/src/db/load.rs b/server/src/db/load.rs index 34c8984d0d..dbccbdf851 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -60,9 +60,13 @@ pub async fn load_or_create_preserved_catalog( ) -> Result<(PreservedCatalog, Catalog, Option)> { // first try to load existing catalogs match PreservedCatalog::load( - db_name, - PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider)), - LoaderEmptyInput::new( + PreservedCatalogConfig::new( + Arc::clone(&iox_object_store), + db_name.to_string(), + Arc::clone(&time_provider), + ), + Loader::new( + db_name, Arc::clone(&metric_registry), Arc::clone(&time_provider), skip_replay, @@ -133,19 +137,19 @@ pub async fn create_preserved_catalog( time_provider: Arc, skip_replay: bool, ) -> Result<(PreservedCatalog, Catalog, Option)> { - let config = PreservedCatalogConfig::new(iox_object_store, Arc::clone(&time_provider)); + let config = PreservedCatalogConfig::new( + iox_object_store, + db_name.to_string(), + Arc::clone(&time_provider), + ); - let (preserved_catalog, loader) = PreservedCatalog::new_empty( - db_name, - config, - LoaderEmptyInput::new(metric_registry, time_provider, skip_replay), - ) - .await - .context(CannotCreateCatalog)?; + let preserved_catalog = PreservedCatalog::new_empty(config) + .await + .context(CannotCreateCatalog)?; let Loader { catalog, planner, .. - } = loader; + } = Loader::new(db_name, metric_registry, time_provider, skip_replay); let plan = planner .map(|planner| planner.build()) .transpose() @@ -153,28 +157,6 @@ pub async fn create_preserved_catalog( Ok((preserved_catalog, catalog, plan)) } -/// All input required to create an empty [`Loader`] -#[derive(Debug)] -struct LoaderEmptyInput { - metric_registry: Arc<::metric::Registry>, - time_provider: Arc, - skip_replay: bool, -} - -impl LoaderEmptyInput { - fn new( - metric_registry: Arc, - time_provider: Arc, - skip_replay: bool, - ) -> Self { - Self { - metric_registry, - time_provider, - skip_replay, - } - } -} - /// Helper to track data during catalog loading. #[derive(Debug)] struct Loader { @@ -183,23 +165,24 @@ struct Loader { metric_registry: Arc, } -impl CatalogState for Loader { - type EmptyInput = LoaderEmptyInput; - - fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self { - let catalog = Catalog::new( - Arc::from(db_name), - Arc::clone(&data.metric_registry), - Arc::clone(&data.time_provider), - ); +impl Loader { + fn new( + db_name: &str, + metric_registry: Arc, + time_provider: Arc, + skip_replay: bool, + ) -> Self { + let catalog = Catalog::new(Arc::from(db_name), metric_registry, time_provider); Self { catalog, - planner: (!data.skip_replay).then(ReplayPlanner::new), + planner: (!skip_replay).then(ReplayPlanner::new), metric_registry: Arc::new(Default::default()), } } +} +impl CatalogState for Loader { fn add( &mut self, iox_object_store: Arc, @@ -348,10 +331,13 @@ mod tests { .await .unwrap(), ); - let config = - PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider)); + let config = PreservedCatalogConfig::new( + Arc::clone(&iox_object_store), + db_name.to_string(), + Arc::clone(&time_provider), + ); - let (preserved_catalog, _catalog) = new_empty(config).await; + let preserved_catalog = new_empty(config).await; parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog) .await; @@ -373,12 +359,12 @@ mod tests { #[tokio::test] async fn test_catalog_state() { - let empty_input = LoaderEmptyInput { - metric_registry: Default::default(), - time_provider: Arc::new(time::SystemProvider::new()), - skip_replay: false, - }; - assert_catalog_state_implementation::(empty_input, checkpoint_data_from_loader) - .await; + let loader = Loader::new( + "db1", + Default::default(), + Arc::new(time::SystemProvider::new()), + false, + ); + assert_catalog_state_implementation(loader, checkpoint_data_from_loader).await; } } diff --git a/server/src/lib.rs b/server/src/lib.rs index cc61533d99..5413dd5aae 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2207,6 +2207,7 @@ mod tests { let config = PreservedCatalogConfig::new( catalog_broken.iox_object_store().unwrap(), + db_name_catalog_broken.to_string(), Arc::clone(application.time_provider()), ); @@ -2292,6 +2293,7 @@ mod tests { let config = PreservedCatalogConfig::new( non_existing_iox_object_store, + db_name_non_existing.to_string(), Arc::clone(application.time_provider()), ); new_empty(config).await; @@ -2390,8 +2392,11 @@ mod tests { .unwrap(), ); - let config = - PreservedCatalogConfig::new(iox_object_store, Arc::clone(application.time_provider())); + let config = PreservedCatalogConfig::new( + iox_object_store, + db_name.to_string(), + Arc::clone(application.time_provider()), + ); // create catalog new_empty(config).await;