Merge pull request #2824 from influxdata/crepererum/cleanup_pres_catalog_interace

refactor: clean up preserved catalog interface
pull/24376/head
kodiakhq[bot] 2021-10-13 14:57:43 +00:00 committed by GitHub
commit e07461b250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 138 additions and 196 deletions

View File

@ -54,18 +54,19 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// The exclusive access can be dropped after this method returned and before calling
/// [`delete_files`].
pub async fn get_unreferenced_parquet_files(
db_name: &str,
catalog: &PreservedCatalog,
max_files: usize,
) -> Result<Vec<ParquetFilePath>> {
let iox_object_store = catalog.iox_object_store();
let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced
let (_catalog, state) =
PreservedCatalog::load::<TracerCatalogState>(db_name, catalog.config(), ())
.await
.context(CatalogLoadError)?
.expect("catalog gone while reading it?");
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
catalog.config(),
TracerCatalogState::default(),
)
.await
.context(CatalogLoadError)?
.expect("catalog gone while reading it?");
state.files.into_inner()
};
@ -120,19 +121,12 @@ pub async fn delete_files(catalog: &PreservedCatalog, files: &[ParquetFilePath])
}
/// Catalog state that traces all used parquet files.
#[derive(Debug, Default)]
struct TracerCatalogState {
files: Mutex<HashSet<ParquetFilePath>>,
}
impl CatalogState for TracerCatalogState {
type EmptyInput = ();
fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self {
Self {
files: Default::default(),
}
}
fn add(
&mut self,
_iox_object_store: Arc<IoxObjectStore>,
@ -161,8 +155,8 @@ impl CatalogState for TracerCatalogState {
mod tests {
use super::*;
use crate::{
catalog::test_helpers::{new_empty, DB_NAME},
test_utils::{chunk_addr, make_config, make_metadata, TestSize},
catalog::test_helpers::{make_config, new_empty},
test_utils::{chunk_addr, make_metadata, TestSize},
};
use std::{collections::HashSet, sync::Arc};
use tokio::sync::RwLock;
@ -171,10 +165,10 @@ mod tests {
async fn test_cleanup_empty() {
let config = make_config().await;
let (catalog, _state) = new_empty(config).await;
let catalog = new_empty(config).await;
// run clean-up
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
delete_files(&catalog, &files).await.unwrap();
@ -185,7 +179,7 @@ mod tests {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
// create some data
let mut paths_keep = vec![];
@ -229,7 +223,7 @@ mod tests {
}
// run clean-up
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
delete_files(&catalog, &files).await.unwrap();
@ -253,7 +247,7 @@ mod tests {
let iox_object_store = &config.iox_object_store;
let lock: RwLock<()> = Default::default();
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
// try multiple times to provoke a conflict
for i in 0..100 {
@ -287,7 +281,7 @@ mod tests {
},
async {
let guard = lock.write().await;
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
let files = get_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
drop(guard);
@ -306,7 +300,7 @@ mod tests {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
// create some files
let mut to_remove = HashSet::default();
@ -322,9 +316,7 @@ mod tests {
}
// run clean-up
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2)
.await
.unwrap();
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
assert_eq!(files.len(), 2);
delete_files(&catalog, &files).await.unwrap();
@ -334,9 +326,7 @@ mod tests {
assert_eq!(leftover.len(), 1);
// run clean-up again
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2)
.await
.unwrap();
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
assert_eq!(files.len(), 1);
delete_files(&catalog, &files).await.unwrap();

View File

@ -169,6 +169,9 @@ pub struct PreservedCatalogConfig {
/// Object store that backs the catalog
pub(crate) iox_object_store: Arc<IoxObjectStore>,
/// Database name
pub(crate) db_name: String,
/// Fixed UUID for testing
pub(crate) fixed_uuid: Option<Uuid>,
@ -179,10 +182,12 @@ pub struct PreservedCatalogConfig {
impl PreservedCatalogConfig {
pub fn new(
iox_object_store: Arc<IoxObjectStore>,
db_name: String,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
iox_object_store,
db_name,
fixed_uuid: None,
time_provider,
}
@ -231,6 +236,9 @@ pub struct PreservedCatalog {
/// Object store that backs this catalog.
iox_object_store: Arc<IoxObjectStore>,
/// Database name
db_name: String,
/// If set, this UUID will be used for all transactions instead of a fresh UUIDv4.
///
/// This can be useful for testing to achieve deterministic outputs.
@ -304,23 +312,16 @@ impl PreservedCatalog {
///
/// An empty transaction will be used to mark the catalog start so that concurrent open but
/// still-empty catalogs can easily be detected.
pub async fn new_empty<S>(
db_name: &str,
config: PreservedCatalogConfig,
state_data: S::EmptyInput,
) -> Result<(Self, S)>
where
S: CatalogState + Send + Sync,
{
pub async fn new_empty(config: PreservedCatalogConfig) -> Result<Self> {
if Self::exists(&config.iox_object_store).await? {
return Err(Error::AlreadyExists {});
}
let state = S::new_empty(db_name, state_data);
let catalog = Self {
previous_tkey: RwLock::new(None),
transaction_semaphore: Semaphore::new(1),
iox_object_store: config.iox_object_store,
db_name: config.db_name,
fixed_uuid: config.fixed_uuid,
time_provider: config.time_provider,
};
@ -333,18 +334,14 @@ impl PreservedCatalog {
.map_err(Box::new)
.context(CommitError)?;
Ok((catalog, state))
Ok(catalog)
}
/// Load existing catalog from store, if it exists.
///
/// Loading starts at the latest checkpoint or -- if none exists -- at transaction `0`.
/// Transactions before that point are neither verified nor are they required to exist.
pub async fn load<S>(
db_name: &str,
config: PreservedCatalogConfig,
state_data: S::EmptyInput,
) -> Result<Option<(Self, S)>>
pub async fn load<S>(config: PreservedCatalogConfig, mut state: S) -> Result<Option<(Self, S)>>
where
S: CatalogState + Send + Sync,
{
@ -408,10 +405,6 @@ impl PreservedCatalog {
return Ok(None);
}
// setup empty state
let mut state = S::new_empty(db_name, state_data);
let mut last_tkey = None;
// detect replay start
let start_revision = last_checkpoint.unwrap_or(0);
@ -419,6 +412,7 @@ impl PreservedCatalog {
let max_revision = max_revision.expect("transactions list is not empty here");
// read and replay delta revisions
let mut last_tkey = None;
for rev in start_revision..=max_revision {
let uuid = transactions.get(&rev).context(MissingTransaction {
revision_counter: rev,
@ -448,6 +442,7 @@ impl PreservedCatalog {
previous_tkey: RwLock::new(last_tkey),
transaction_semaphore: Semaphore::new(1),
iox_object_store: config.iox_object_store,
db_name: config.db_name,
fixed_uuid: config.fixed_uuid,
time_provider: config.time_provider,
},
@ -486,6 +481,7 @@ impl PreservedCatalog {
pub fn config(&self) -> PreservedCatalogConfig {
PreservedCatalogConfig {
iox_object_store: Arc::clone(&self.iox_object_store),
db_name: self.db_name.clone(),
fixed_uuid: self.fixed_uuid,
time_provider: Arc::clone(&self.time_provider),
}
@ -1074,11 +1070,9 @@ mod tests {
use super::*;
use crate::catalog::test_helpers::{
break_catalog_with_weird_version, create_delete_predicate, exists, load_err, load_ok,
new_empty, TestCatalogState, DB_NAME,
};
use crate::test_utils::{
chunk_addr, make_config, make_iox_object_store, make_metadata, TestSize,
make_config, new_empty, TestCatalogState,
};
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize};
#[tokio::test]
async fn test_create_empty() {
@ -1364,7 +1358,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_debug() {
let config = make_config().await;
let (catalog, _state) = new_empty(config).await;
let catalog = new_empty(config).await;
let mut t = catalog.open_transaction().await;
// open transaction
@ -1722,7 +1716,8 @@ mod tests {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let (catalog, mut state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default();
{
let mut t = catalog.open_transaction().await;
@ -1879,7 +1874,8 @@ mod tests {
async fn assert_single_catalog_inmem_works(config: PreservedCatalogConfig) -> TestTrace {
let iox_object_store = &config.iox_object_store;
let (catalog, mut state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default();
// track all the intermediate results
let mut trace = TestTrace::new();
@ -2002,7 +1998,7 @@ mod tests {
new_empty(config.clone()).await;
let res = PreservedCatalog::new_empty::<TestCatalogState>(DB_NAME, config, ()).await;
let res = PreservedCatalog::new_empty(config).await;
assert_eq!(res.unwrap_err().to_string(), "Catalog already exists");
}
@ -2056,7 +2052,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_revision_counter() {
let config = make_config().await;
let (catalog, _state) = new_empty(config).await;
let catalog = new_empty(config).await;
let t = catalog.open_transaction().await;
assert_eq!(t.revision_counter(), 1);
@ -2065,7 +2061,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_uuid() {
let config = make_config().await;
let (catalog, _state) = new_empty(config).await;
let catalog = new_empty(config).await;
let mut t = catalog.open_transaction().await;
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().as_bytes().to_vec().into();
@ -2272,7 +2268,8 @@ mod tests {
assert!(!exists(iox_object_store).await);
let (catalog, state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
let state = TestCatalogState::default();
// delete transaction file
let tkey = catalog.previous_tkey.read().unwrap();

View File

@ -221,11 +221,9 @@ mod tests {
use super::*;
use crate::{
catalog::{
core::PreservedCatalog,
interface::CatalogParquetInfo,
test_helpers::{TestCatalogState, DB_NAME},
core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::make_config,
},
test_utils::{chunk_addr, make_config, make_metadata, TestSize},
test_utils::{chunk_addr, make_metadata, TestSize},
};
use time::Time;
use uuid::Uuid;
@ -241,10 +239,7 @@ mod tests {
let iox_object_store = &config.iox_object_store;
// build catalog with some data
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(DB_NAME, config.clone(), ())
.await
.unwrap();
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let mut transaction = catalog.open_transaction().await;
@ -361,10 +356,7 @@ File {
let iox_object_store = &config.iox_object_store;
// build catalog with some data
let (catalog, _state) =
PreservedCatalog::new_empty::<TestCatalogState>(DB_NAME, config.clone(), ())
.await
.unwrap();
let catalog = PreservedCatalog::new_empty(config.clone()).await.unwrap();
{
let mut transaction = catalog.open_transaction().await;

View File

@ -98,14 +98,6 @@ pub enum CatalogStateRemoveError {
/// Abstraction over how the in-memory state of the catalog works.
pub trait CatalogState {
/// Input to create a new empty instance.
///
/// See [`new_empty`](Self::new_empty) for details.
type EmptyInput: Send;
/// Create empty state w/o any known files.
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self;
/// Add parquet file to state.
fn add(
&mut self,

View File

@ -125,9 +125,9 @@ mod tests {
catalog::{
core::PreservedCatalog,
interface::CheckpointData,
test_helpers::{load_ok, new_empty},
test_helpers::{load_ok, make_config, new_empty},
},
test_utils::{make_config, make_iox_object_store},
test_utils::make_iox_object_store,
};
use super::*;
@ -166,7 +166,7 @@ mod tests {
let iox_object_store = &config.iox_object_store;
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
@ -194,7 +194,7 @@ mod tests {
let iox_object_store = &config.iox_object_store;
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
@ -229,7 +229,7 @@ mod tests {
let config = make_config().await;
let iox_object_store = &config.iox_object_store;
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
create_transaction(&catalog).await;
create_transaction_and_checkpoint(&catalog).await;
create_transaction(&catalog).await;

View File

@ -69,9 +69,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set
/// `ignore_metadata_read_failure` to `true` to ignore these cases.
pub async fn rebuild_catalog<S>(
db_name: &str,
config: PreservedCatalogConfig,
catalog_empty_input: S::EmptyInput,
mut state: S,
ignore_metadata_read_failure: bool,
) -> Result<(PreservedCatalog, S)>
where
@ -81,10 +80,9 @@ where
let files = collect_files(&config.iox_object_store, ignore_metadata_read_failure).await?;
// create new empty catalog
let (catalog, mut state) =
PreservedCatalog::new_empty::<S>(db_name, config.clone(), catalog_empty_input)
.await
.context(NewEmptyFailure)?;
let catalog = PreservedCatalog::new_empty(config.clone())
.await
.context(NewEmptyFailure)?;
// create single transaction with all files
if !files.is_empty() {
@ -174,13 +172,11 @@ mod tests {
use crate::{
catalog::{
core::PreservedCatalog,
test_helpers::{exists, new_empty, TestCatalogState, DB_NAME},
test_helpers::{exists, make_config, new_empty, TestCatalogState},
},
metadata::IoxMetadata,
storage::{MemWriter, Storage},
test_utils::{
create_partition_and_database_checkpoint, make_config, make_record_batch, TestSize,
},
test_utils::{create_partition_and_database_checkpoint, make_record_batch, TestSize},
};
use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder};
use datafusion::physical_plan::SendableRecordBatchStream;
@ -196,7 +192,8 @@ mod tests {
let db_name = Arc::from("db1");
// build catalog with some data
let (catalog, mut state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
let mut state = TestCatalogState::default();
{
let mut transaction = catalog.open_transaction().await;
@ -237,7 +234,7 @@ mod tests {
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild
let (catalog, state) = rebuild_catalog::<TestCatalogState>(DB_NAME, config, (), false)
let (catalog, state) = rebuild_catalog(config, TestCatalogState::default(), false)
.await
.unwrap();
@ -256,7 +253,7 @@ mod tests {
let config = make_config().await;
// build empty catalog
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
// wipe catalog
drop(catalog);
@ -265,7 +262,7 @@ mod tests {
.unwrap();
// rebuild
let (catalog, state) = rebuild_catalog::<TestCatalogState>(DB_NAME, config, (), false)
let (catalog, state) = rebuild_catalog(config, TestCatalogState::default(), false)
.await
.unwrap();
@ -293,15 +290,14 @@ mod tests {
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild (do not ignore errors)
let res = rebuild_catalog::<TestCatalogState>(DB_NAME, config.clone(), (), false).await;
let res = rebuild_catalog(config.clone(), TestCatalogState::default(), false).await;
assert!(dbg!(res.unwrap_err().to_string())
.starts_with("Cannot read IOx metadata from parquet file"));
// rebuild (ignore errors)
let (catalog, state) =
rebuild_catalog::<TestCatalogState>(DB_NAME, config.clone(), (), true)
.await
.unwrap();
let (catalog, state) = rebuild_catalog(config.clone(), TestCatalogState::default(), true)
.await
.unwrap();
assert!(state.files().next().is_none());
assert_eq!(catalog.revision_counter(), 0);
}
@ -320,7 +316,7 @@ mod tests {
let iox_object_store = &config.iox_object_store;
// build catalog with some data (2 transactions + initial empty one)
let (catalog, _state) = new_empty(config.clone()).await;
let catalog = new_empty(config.clone()).await;
assert_eq!(catalog.revision_counter(), 0);
// wipe catalog
@ -328,7 +324,7 @@ mod tests {
PreservedCatalog::wipe(iox_object_store).await.unwrap();
// rebuild
let catalog = rebuild_catalog::<TestCatalogState>(DB_NAME, config.clone(), (), false)
let catalog = rebuild_catalog(config.clone(), TestCatalogState::default(), false)
.await
.unwrap();
drop(catalog);

View File

@ -1,7 +1,6 @@
use crate::catalog::core::PreservedCatalogConfig;
use crate::{
catalog::{
core::PreservedCatalog,
core::{PreservedCatalog, PreservedCatalogConfig},
interface::{
CatalogParquetInfo, CatalogState, CatalogStateAddError, CatalogStateRemoveError,
CheckpointData, ChunkAddrWithoutDatabase,
@ -12,7 +11,7 @@ use crate::{
},
},
metadata::IoxParquetMetaData,
test_utils::{chunk_addr, make_config, make_metadata, TestSize},
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
};
use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange};
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
@ -146,14 +145,6 @@ impl TestCatalogState {
}
impl CatalogState for TestCatalogState {
type EmptyInput = ();
fn new_empty(_db_name: &str, _data: Self::EmptyInput) -> Self {
Self {
tables: HashMap::new(),
}
}
fn add(
&mut self,
_iox_object_store: Arc<IoxObjectStore>,
@ -222,21 +213,21 @@ pub async fn exists(iox_object_store: &Arc<IoxObjectStore>) -> bool {
pub async fn load_ok(
config: PreservedCatalogConfig,
) -> Option<(PreservedCatalog, TestCatalogState)> {
PreservedCatalog::load(DB_NAME, config, ()).await.unwrap()
PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap()
}
/// Load a `PreservedCatalog` and unwrap the error, expecting the operation to fail
pub async fn load_err(config: PreservedCatalogConfig) -> crate::catalog::core::Error {
PreservedCatalog::load::<TestCatalogState>(DB_NAME, config, ())
PreservedCatalog::load(config, TestCatalogState::default())
.await
.unwrap_err()
}
/// Create a new empty catalog with the TestCatalogState, expecting the operation to succeed
pub async fn new_empty(config: PreservedCatalogConfig) -> (PreservedCatalog, TestCatalogState) {
PreservedCatalog::new_empty(DB_NAME, config, ())
.await
.unwrap()
pub async fn new_empty(config: PreservedCatalogConfig) -> PreservedCatalog {
PreservedCatalog::new_empty(config).await.unwrap()
}
/// Break preserved catalog by moving one of the transaction files into a weird unknown version.
@ -265,17 +256,12 @@ fn get_tkey(catalog: &PreservedCatalog) -> TransactionKey {
/// Torture-test implementations for [`CatalogState`].
///
/// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided.
pub async fn assert_catalog_state_implementation<S, F>(state_data: S::EmptyInput, f: F)
pub async fn assert_catalog_state_implementation<S, F>(mut state: S, f: F)
where
S: CatalogState + Debug + Send + Sync,
F: Fn(&S) -> CheckpointData + Send,
{
// empty state
let config = make_config().await;
let (_catalog, mut state) =
PreservedCatalog::new_empty::<S>(DB_NAME, config.clone(), state_data)
.await
.unwrap();
// The expected state of the catalog
let mut expected_files: HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)> =
@ -675,14 +661,21 @@ pub fn create_delete_predicate(value: i64) -> Arc<DeletePredicate> {
})
}
/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store
pub async fn make_config() -> PreservedCatalogConfig {
let iox_object_store = make_iox_object_store().await;
let time_provider = Arc::new(time::SystemProvider::new());
PreservedCatalogConfig::new(iox_object_store, DB_NAME.to_string(), time_provider)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_catalog_state() {
assert_catalog_state_implementation::<TestCatalogState, _>(
(),
assert_catalog_state_implementation(
TestCatalogState::default(),
TestCatalogState::checkpoint_data,
)
.await;

View File

@ -1,4 +1,3 @@
use crate::catalog::core::PreservedCatalogConfig;
use crate::{
chunk::{self, ChunkMetrics, ParquetChunk},
metadata::{IoxMetadata, IoxParquetMetaData},
@ -868,13 +867,6 @@ pub async fn make_iox_object_store() -> Arc<IoxObjectStore> {
)
}
/// Creates a new [`PreservedCatalogConfig`] with an in-memory object store
pub async fn make_config() -> PreservedCatalogConfig {
let iox_object_store = make_iox_object_store().await;
let time_provider = Arc::new(time::SystemProvider::new());
PreservedCatalogConfig::new(iox_object_store, time_provider)
}
pub fn read_data_from_parquet_data(schema: SchemaRef, parquet_data: Vec<u8>) -> Vec<RecordBatch> {
let mut record_batches = vec![];

View File

@ -919,8 +919,7 @@ impl Db {
self: &Arc<Self>,
) -> std::result::Result<(), parquet_file::catalog::cleanup::Error> {
let guard = self.cleanup_lock.write().await;
let files =
get_unreferenced_parquet_files(&self.name(), &self.preserved_catalog, 1_000).await?;
let files = get_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await?;
drop(guard);
delete_parquet_files(&self.preserved_catalog, &files).await

View File

@ -60,9 +60,13 @@ pub async fn load_or_create_preserved_catalog(
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
// first try to load existing catalogs
match PreservedCatalog::load(
db_name,
PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider)),
LoaderEmptyInput::new(
PreservedCatalogConfig::new(
Arc::clone(&iox_object_store),
db_name.to_string(),
Arc::clone(&time_provider),
),
Loader::new(
db_name,
Arc::clone(&metric_registry),
Arc::clone(&time_provider),
skip_replay,
@ -133,19 +137,19 @@ pub async fn create_preserved_catalog(
time_provider: Arc<dyn TimeProvider>,
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
let config = PreservedCatalogConfig::new(iox_object_store, Arc::clone(&time_provider));
let config = PreservedCatalogConfig::new(
iox_object_store,
db_name.to_string(),
Arc::clone(&time_provider),
);
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
db_name,
config,
LoaderEmptyInput::new(metric_registry, time_provider, skip_replay),
)
.await
.context(CannotCreateCatalog)?;
let preserved_catalog = PreservedCatalog::new_empty(config)
.await
.context(CannotCreateCatalog)?;
let Loader {
catalog, planner, ..
} = loader;
} = Loader::new(db_name, metric_registry, time_provider, skip_replay);
let plan = planner
.map(|planner| planner.build())
.transpose()
@ -153,28 +157,6 @@ pub async fn create_preserved_catalog(
Ok((preserved_catalog, catalog, plan))
}
/// All input required to create an empty [`Loader`]
#[derive(Debug)]
struct LoaderEmptyInput {
metric_registry: Arc<::metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
skip_replay: bool,
}
impl LoaderEmptyInput {
fn new(
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
skip_replay: bool,
) -> Self {
Self {
metric_registry,
time_provider,
skip_replay,
}
}
}
/// Helper to track data during catalog loading.
#[derive(Debug)]
struct Loader {
@ -183,23 +165,24 @@ struct Loader {
metric_registry: Arc<metric::Registry>,
}
impl CatalogState for Loader {
type EmptyInput = LoaderEmptyInput;
fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self {
let catalog = Catalog::new(
Arc::from(db_name),
Arc::clone(&data.metric_registry),
Arc::clone(&data.time_provider),
);
impl Loader {
fn new(
db_name: &str,
metric_registry: Arc<metric::Registry>,
time_provider: Arc<dyn TimeProvider>,
skip_replay: bool,
) -> Self {
let catalog = Catalog::new(Arc::from(db_name), metric_registry, time_provider);
Self {
catalog,
planner: (!data.skip_replay).then(ReplayPlanner::new),
planner: (!skip_replay).then(ReplayPlanner::new),
metric_registry: Arc::new(Default::default()),
}
}
}
impl CatalogState for Loader {
fn add(
&mut self,
iox_object_store: Arc<IoxObjectStore>,
@ -348,10 +331,13 @@ mod tests {
.await
.unwrap(),
);
let config =
PreservedCatalogConfig::new(Arc::clone(&iox_object_store), Arc::clone(&time_provider));
let config = PreservedCatalogConfig::new(
Arc::clone(&iox_object_store),
db_name.to_string(),
Arc::clone(&time_provider),
);
let (preserved_catalog, _catalog) = new_empty(config).await;
let preserved_catalog = new_empty(config).await;
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
@ -373,12 +359,12 @@ mod tests {
#[tokio::test]
async fn test_catalog_state() {
let empty_input = LoaderEmptyInput {
metric_registry: Default::default(),
time_provider: Arc::new(time::SystemProvider::new()),
skip_replay: false,
};
assert_catalog_state_implementation::<Loader, _>(empty_input, checkpoint_data_from_loader)
.await;
let loader = Loader::new(
"db1",
Default::default(),
Arc::new(time::SystemProvider::new()),
false,
);
assert_catalog_state_implementation(loader, checkpoint_data_from_loader).await;
}
}

View File

@ -2207,6 +2207,7 @@ mod tests {
let config = PreservedCatalogConfig::new(
catalog_broken.iox_object_store().unwrap(),
db_name_catalog_broken.to_string(),
Arc::clone(application.time_provider()),
);
@ -2292,6 +2293,7 @@ mod tests {
let config = PreservedCatalogConfig::new(
non_existing_iox_object_store,
db_name_non_existing.to_string(),
Arc::clone(application.time_provider()),
);
new_empty(config).await;
@ -2390,8 +2392,11 @@ mod tests {
.unwrap(),
);
let config =
PreservedCatalogConfig::new(iox_object_store, Arc::clone(application.time_provider()));
let config = PreservedCatalogConfig::new(
iox_object_store,
db_name.to_string(),
Arc::clone(application.time_provider()),
);
// create catalog
new_empty(config).await;