diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 97b2217b61..3b9ae23537 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -19,9 +19,11 @@ use object_store::{ ObjectStore, ObjectStoreApi, }; use observability_deps::tracing::{info, warn}; +use parking_lot::RwLock; use parquet::file::metadata::ParquetMetaData; use prost::{DecodeError, EncodeError, Message}; use snafu::{OptionExt, ResultExt, Snafu}; +use tokio::sync::{Semaphore, SemaphorePermit}; use uuid::Uuid; /// Current version for serialized transactions. @@ -126,27 +128,124 @@ pub enum Error { #[snafu(display("Cannot decode parquet metadata: {}", source))] MetadataDecodingFailed { source: crate::metadata::Error }, + + #[snafu(display("Cannot parse path {:?}: {}", path, source), visibility(pub))] + PathParseFailed { + source: crate::storage::Error, + path: DirsAndFileName, + }, + + #[snafu( + display("Cannot read schema from {:?}: {}", path, source), + visibility(pub) + )] + SchemaReadFailed { + source: crate::storage::Error, + path: DirsAndFileName, + }, + + #[snafu( + display("Cannot read statistics from {:?}: {}", path, source), + visibility(pub) + )] + StatisticsReadFailed { + source: crate::metadata::Error, + path: DirsAndFileName, + }, + + #[snafu( + display("Catalog state failure when processing {:?}: {}", path, source), + visibility(pub) + )] + CatalogStateFailure { + source: Box, + path: DirsAndFileName, + }, } pub type Result = std::result::Result; -/// In-memory view of the preserved catalog. -/// -/// **NOTE: This is a temporary structure until this module is wired up to the real in-memory catalog!** -pub struct PreservedCatalog { +/// Struct containing all information that a catalog received for a new parquet file. +#[derive(Debug)] +pub struct CatalogParquetInfo { + /// Full path. + pub path: DirsAndFileName, + + /// Associated parquet metadata. + pub metadata: ParquetMetaData, +} + +/// Abstraction over how the in-memory state of the catalog works. +pub trait CatalogState { + /// Input to create a new empty instance. + /// + /// See [`new_empty`](Self::new_empty) for details. + type EmptyInput; + + /// Create empty state w/o any known files. + fn new_empty(data: Self::EmptyInput) -> Self; + + /// Opens a new state. + /// + /// Depending if the state implements full copy-on-write semantics, do one of the following: + /// + /// - clone state + /// - return a pointer to self (e.g. using an [`Arc`](std::sync::Arc)) + fn clone_or_keep(origin: &Arc) -> Arc; + + /// Add parquet file to state. + fn add( + &self, + object_store: Arc, + server_id: ServerId, + db_name: &str, + info: CatalogParquetInfo, + ) -> Result<()>; + + /// Remove parquet file from state. + fn remove(&self, path: DirsAndFileName) -> Result<()>; +} + +/// Inner mutable part of the preserved catalog. +struct PreservedCatalogInner +where + S: CatalogState, +{ previous_tkey: Option, - state: CatalogState, + state: Arc, +} + +/// In-memory view of the preserved catalog. +pub struct PreservedCatalog +where + S: CatalogState, +{ + inner: RwLock>, + transaction_semaphore: Semaphore, object_store: Arc, server_id: ServerId, db_name: String, } -impl PreservedCatalog { +impl PreservedCatalog +where + S: CatalogState, +{ /// Create new catalog w/o any data. - pub fn new_empty(object_store: Arc, server_id: ServerId, db_name: String) -> Self { - Self { + pub fn new_empty( + object_store: Arc, + server_id: ServerId, + db_name: String, + state_data: S::EmptyInput, + ) -> Self { + let inner = PreservedCatalogInner { previous_tkey: None, - state: CatalogState::new_empty(), + state: Arc::new(S::new_empty(state_data)), + }; + + Self { + inner: RwLock::new(inner), + transaction_semaphore: Semaphore::new(1), object_store, server_id, db_name, @@ -158,6 +257,7 @@ impl PreservedCatalog { object_store: Arc, server_id: ServerId, db_name: String, + state_data: S::EmptyInput, ) -> Result> { // parse all paths into revisions let list_path = transactions_path(&object_store, server_id, &db_name); @@ -210,7 +310,7 @@ impl PreservedCatalog { // read and replay revisions let max_revision = max_revision.expect("transactions list is not empty here"); - let mut state = CatalogState::new_empty(); + let mut state = Arc::new(CatalogState::new_empty(state_data)); let mut last_tkey = None; for rev in 0..=max_revision { let uuid = transactions.get(&rev).context(MissingTransaction { @@ -220,12 +320,13 @@ impl PreservedCatalog { revision_counter: rev, uuid: *uuid, }; + let tmp_state = S::clone_or_keep(&state); let transaction = OpenTransaction::load_and_apply( &object_store, server_id, &db_name, &tkey, - state.clone(), + tmp_state, &last_tkey, ) .await?; @@ -233,9 +334,14 @@ impl PreservedCatalog { state = transaction.next_state; } - Ok(Some(Self { + let inner = PreservedCatalogInner { previous_tkey: last_tkey, state, + }; + + Ok(Some(Self { + inner: RwLock::new(inner), + transaction_semaphore: Semaphore::new(1), object_store, server_id, db_name, @@ -243,24 +349,31 @@ impl PreservedCatalog { } /// Open a new transaction. - pub fn open_transaction(&mut self) -> TransactionHandle<'_> { - TransactionHandle::new(self) + pub async fn open_transaction(&self) -> TransactionHandle<'_, S> { + TransactionHandle::new(self).await } /// Return current state. - pub fn state(&self) -> &CatalogState { - &self.state + pub fn state(&self) -> Arc { + Arc::clone(&self.inner.read().state) } /// Get latest revision counter. /// /// This can be `None` for a newly created catalog. pub fn revision_counter(&self) -> Option { - self.previous_tkey.clone().map(|tkey| tkey.revision_counter) + self.inner + .read() + .previous_tkey + .clone() + .map(|tkey| tkey.revision_counter) } } -impl Debug for PreservedCatalog { +impl Debug for PreservedCatalog +where + S: CatalogState, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "PreservedCatalog{{..}}") } @@ -431,36 +544,27 @@ impl Display for TransactionKey { } } -/// In-memory catalog state, for testing. -#[derive(Clone, Debug)] -pub struct CatalogState { - pub parquet_files: HashMap, -} - -impl CatalogState { - /// Create new empty state w/o any tracked files. - fn new_empty() -> Self { - Self { - parquet_files: HashMap::new(), - } - } -} - /// Tracker for an open, uncommitted transaction. -struct OpenTransaction { - next_state: CatalogState, +struct OpenTransaction +where + S: CatalogState, +{ + next_state: Arc, proto: proto::Transaction, } -impl OpenTransaction { - fn new(catalog: &PreservedCatalog) -> Self { - let (revision_counter, previous_uuid) = match &catalog.previous_tkey { +impl OpenTransaction +where + S: CatalogState, +{ + fn new(catalog_inner: &PreservedCatalogInner) -> Self { + let (revision_counter, previous_uuid) = match &catalog_inner.previous_tkey { Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()), None => (0, String::new()), }; Self { - next_state: catalog.state.clone(), + next_state: S::clone_or_keep(&catalog_inner.state), proto: proto::Transaction { actions: vec![], version: TRANSACTION_VERSION, @@ -479,8 +583,11 @@ impl OpenTransaction { } fn handle_action( - state: &mut CatalogState, + state: &S, action: &proto::transaction::action::Action, + object_store: &Arc, + server_id: ServerId, + db_name: &str, ) -> Result<()> { match action { proto::transaction::action::Action::Upgrade(u) => { @@ -491,29 +598,18 @@ impl OpenTransaction { } proto::transaction::action::Action::AddParquet(a) => { let path = parse_dirs_and_filename(&a.path)?; - match state.parquet_files.entry(path) { - Occupied(o) => { - return Err(Error::ParquetFileAlreadyExists { - path: o.key().clone(), - }); - } - Vacant(v) => { - let metadata = thrift_to_parquet_metadata(&a.metadata) - .context(MetadataDecodingFailed)?; - v.insert(metadata); - } - } + let metadata = + thrift_to_parquet_metadata(&a.metadata).context(MetadataDecodingFailed)?; + state.add( + Arc::clone(object_store), + server_id, + db_name, + CatalogParquetInfo { path, metadata }, + )?; } proto::transaction::action::Action::RemoveParquet(a) => { let path = parse_dirs_and_filename(&a.path)?; - match state.parquet_files.entry(path) { - Occupied(o) => { - o.remove(); - } - Vacant(v) => { - return Err(Error::ParquetFileDoesNotExist { path: v.into_key() }); - } - } + state.remove(path)?; } }; Ok(()) @@ -522,18 +618,21 @@ impl OpenTransaction { fn handle_action_and_record( &mut self, action: proto::transaction::action::Action, + object_store: &Arc, + server_id: ServerId, + db_name: &str, ) -> Result<()> { - Self::handle_action(&mut self.next_state, &action)?; + Self::handle_action(&self.next_state, &action, object_store, server_id, db_name)?; self.proto.actions.push(proto::transaction::Action { action: Some(action), }); Ok(()) } - fn commit(mut self, catalog: &mut PreservedCatalog) { + fn commit(mut self, catalog_inner: &mut PreservedCatalogInner) { let tkey = self.tkey(); - std::mem::swap(&mut catalog.state, &mut self.next_state); - catalog.previous_tkey = Some(tkey); + std::mem::swap(&mut catalog_inner.state, &mut self.next_state); + catalog_inner.previous_tkey = Some(tkey); } async fn store( @@ -548,11 +647,11 @@ impl OpenTransaction { } async fn load_and_apply( - object_store: &ObjectStore, + object_store: &Arc, server_id: ServerId, db_name: &str, tkey: &TransactionKey, - mut state: CatalogState, + state: Arc, last_tkey: &Option, ) -> Result { // recover state from store @@ -599,7 +698,7 @@ impl OpenTransaction { // apply for action in &proto.actions { if let Some(action) = action.action.as_ref() { - Self::handle_action(&mut state, action)?; + Self::handle_action(&state, action, object_store, server_id, db_name)?; } } @@ -613,20 +712,45 @@ impl OpenTransaction { /// Handle for an open uncommitted transaction. /// /// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning. -pub struct TransactionHandle<'c> { - catalog: &'c mut PreservedCatalog, - transaction: Option, +pub struct TransactionHandle<'c, S> +where + S: CatalogState, +{ + catalog: &'c PreservedCatalog, + + // NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we + // rely on. + #[allow(dead_code)] + permit: SemaphorePermit<'c>, + + transaction: Option>, } -impl<'c> TransactionHandle<'c> { - fn new(catalog: &'c mut PreservedCatalog) -> Self { - let transaction = OpenTransaction::new(catalog); +impl<'c, S> TransactionHandle<'c, S> +where + S: CatalogState, +{ + async fn new(catalog: &'c PreservedCatalog) -> TransactionHandle<'c, S> { + // first acquire semaphore (which is only being used for transactions), then get state lock + let permit = catalog + .transaction_semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + let inner_guard = catalog.inner.write(); + + let transaction = OpenTransaction::new(&inner_guard); + + // free state for readers again + drop(inner_guard); + let tkey = transaction.tkey(); info!(?tkey, "transaction started"); Self { catalog, transaction: Some(transaction), + permit, } } @@ -647,7 +771,9 @@ impl<'c> TransactionHandle<'c> { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); let tkey = t.tkey(); - t.commit(self.catalog); + let mut inner_guard = self.catalog.inner.write(); + t.commit(&mut inner_guard); + drop(inner_guard); info!(?tkey, "transaction committed"); Ok(()) @@ -664,13 +790,16 @@ impl<'c> TransactionHandle<'c> { self.transaction .as_mut() .expect("transaction handle w/o transaction?!") - .handle_action_and_record(proto::transaction::action::Action::AddParquet( - proto::AddParquet { + .handle_action_and_record( + proto::transaction::action::Action::AddParquet(proto::AddParquet { path: Some(unparse_dirs_and_filename(path)), metadata: parquet_metadata_to_thrift(metadata) .context(MetadataEncodingFailed)?, - }, - )) + }), + &self.catalog.object_store, + self.catalog.server_id, + &self.catalog.db_name, + ) } /// Remove a parquet file from the catalog. @@ -680,15 +809,21 @@ impl<'c> TransactionHandle<'c> { self.transaction .as_mut() .expect("transaction handle w/o transaction?!") - .handle_action_and_record(proto::transaction::action::Action::RemoveParquet( - proto::RemoveParquet { + .handle_action_and_record( + proto::transaction::action::Action::RemoveParquet(proto::RemoveParquet { path: Some(unparse_dirs_and_filename(path)), - }, - )) + }), + &self.catalog.object_store, + self.catalog.server_id, + &self.catalog.db_name, + ) } } -impl<'c> Debug for TransactionHandle<'c> { +impl<'c, S> Debug for TransactionHandle<'c, S> +where + S: CatalogState, +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.transaction { Some(t) => write!(f, "TransactionHandle(open, {})", t.tkey()), @@ -697,7 +832,10 @@ impl<'c> Debug for TransactionHandle<'c> { } } -impl<'c> Drop for TransactionHandle<'c> { +impl<'c, S> Drop for TransactionHandle<'c, S> +where + S: CatalogState, +{ fn drop(&mut self) { if self.transaction.is_some() { warn!(?self, "dropped uncommitted transaction"); @@ -707,7 +845,7 @@ impl<'c> Drop for TransactionHandle<'c> { #[cfg(test)] mod tests { - use std::num::NonZeroU32; + use std::{cell::RefCell, num::NonZeroU32, ops::Deref}; use crate::{ metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata}, @@ -718,6 +856,67 @@ mod tests { use super::*; + #[derive(Clone, Debug)] + struct TestCatalogStateInner { + pub parquet_files: HashMap, + } + + /// In-memory catalog state, for testing. + #[derive(Clone, Debug)] + struct TestCatalogState { + pub inner: RefCell, + } + + impl CatalogState for TestCatalogState { + type EmptyInput = (); + + fn new_empty(_data: Self::EmptyInput) -> Self { + Self { + inner: RefCell::new(TestCatalogStateInner { + parquet_files: HashMap::new(), + }), + } + } + + fn clone_or_keep(origin: &Arc) -> Arc { + Arc::new(origin.deref().clone()) + } + + fn add( + &self, + _object_store: Arc, + _server_id: ServerId, + _db_name: &str, + info: CatalogParquetInfo, + ) -> Result<()> { + match self.inner.borrow_mut().parquet_files.entry(info.path) { + Occupied(o) => { + return Err(Error::ParquetFileAlreadyExists { + path: o.key().clone(), + }); + } + Vacant(v) => { + v.insert(info.metadata); + } + } + + Ok(()) + } + + fn remove(&self, path: DirsAndFileName) -> Result<()> { + match self.inner.borrow_mut().parquet_files.entry(path) { + Occupied(o) => { + o.remove(); + } + Vacant(v) => { + return Err(Error::ParquetFileDoesNotExist { path: v.into_key() }); + } + } + + Ok(()) + } + } + #[tokio::test] async fn test_inmem_commit_semantics() { let object_store = make_object_store(); @@ -733,9 +932,14 @@ mod tests { #[tokio::test] async fn test_load_from_empty_store() { let object_store = make_object_store(); - let option = PreservedCatalog::load(object_store, make_server_id(), "db1".to_string()) - .await - .unwrap(); + let option = PreservedCatalog::::load( + object_store, + make_server_id(), + "db1".to_string(), + (), + ) + .await + .unwrap(); assert!(option.is_none()); } @@ -782,9 +986,14 @@ mod tests { create_empty_file(&object_store, &path).await; // no data present - let option = PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.clone()) - .await - .unwrap(); + let option = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.clone(), + (), + ) + .await + .unwrap(); assert!(option.is_none()); // can still write + read @@ -805,8 +1014,13 @@ mod tests { checked_delete(&object_store, &path).await; // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert!(matches!( res, Err(Error::MissingTransaction { @@ -833,8 +1047,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "Format version of transaction file for revision 0 is 42 but only [1] are supported" @@ -859,8 +1078,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "Wrong revision counter in transaction file: expected 0 but found 42" @@ -887,8 +1111,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), format!( @@ -916,8 +1145,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "UUID required but not provided" @@ -942,8 +1176,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "Cannot parse UUID: invalid length: expected one of [36, 32], found 3" @@ -968,8 +1207,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!(res.unwrap_err().to_string(), "Wrong link to previous UUID in revision 0: expected None but found Some(00000000-0000-0000-0000-000000000000)"); } @@ -991,8 +1235,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!(res.unwrap_err().to_string(), format!("Wrong link to previous UUID in revision 1: expected Some({}) but found Some(00000000-0000-0000-0000-000000000000)", trace.tkeys[0].uuid)); } @@ -1014,8 +1263,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "Cannot parse UUID: invalid length: expected one of [36, 32], found 3" @@ -1025,9 +1279,13 @@ mod tests { #[tokio::test] async fn test_transaction_handle_debug() { let object_store = make_object_store(); - let mut catalog = - PreservedCatalog::new_empty(object_store, make_server_id(), "db1".to_string()); - let mut t = catalog.open_transaction(); + let catalog = PreservedCatalog::::new_empty( + object_store, + make_server_id(), + "db1".to_string(), + (), + ); + let mut t = catalog.open_transaction().await; // open transaction t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); @@ -1064,8 +1322,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; let (uuid1, uuid2) = if old_uuid < new_uuid { (old_uuid, new_uuid) } else { @@ -1098,8 +1361,13 @@ mod tests { .unwrap(); // loading catalog should fail now - let res = - PreservedCatalog::load(Arc::clone(&object_store), server_id, db_name.to_string()).await; + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; assert_eq!( res.unwrap_err().to_string(), "Upgrade path not implemented/supported: foo", @@ -1107,8 +1375,10 @@ mod tests { } /// Get sorted list of catalog files from state - fn get_catalog_parquet_files(state: &CatalogState) -> Vec<(String, ParquetMetaData)> { + fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> { let mut files: Vec<(String, ParquetMetaData)> = state + .inner + .borrow() .parquet_files .iter() .map(|(path, md)| (path.display(), md.clone())) @@ -1119,10 +1389,10 @@ mod tests { /// Assert that set of parquet files tracked by a catalog are identical to the given sorted list. fn assert_catalog_parquet_files( - catalog: &PreservedCatalog, + catalog: &PreservedCatalog, expected: &[(String, ParquetMetaData)], ) { - let actual = get_catalog_parquet_files(catalog.state()); + let actual = get_catalog_parquet_files(&catalog.state()); for ((actual_path, actual_md), (expected_path, expected_md)) in actual.iter().zip(expected.iter()) { @@ -1180,7 +1450,7 @@ mod tests { /// Result of [`assert_single_catalog_inmem_works`]. struct TestTrace { tkeys: Vec, - states: Vec, + states: Vec, } impl TestTrace { @@ -1191,9 +1461,10 @@ mod tests { } } - fn record(&mut self, catalog: &PreservedCatalog) { - self.tkeys.push(catalog.previous_tkey.clone().unwrap()); - self.states.push(catalog.state.clone()); + fn record(&mut self, catalog: &PreservedCatalog) { + self.tkeys + .push(catalog.inner.read().previous_tkey.clone().unwrap()); + self.states.push(catalog.state().deref().clone()); } } @@ -1202,8 +1473,12 @@ mod tests { server_id: ServerId, db_name: &str, ) -> TestTrace { - let mut catalog = - PreservedCatalog::new_empty(Arc::clone(&object_store), server_id, db_name.to_string()); + let catalog = PreservedCatalog::new_empty( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ); // get some test metadata let metadata1 = make_metadata(object_store, "foo").await; @@ -1218,7 +1493,7 @@ mod tests { // fill catalog with examples { - let mut t = catalog.open_transaction(); + let mut t = catalog.open_transaction().await; t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap(); t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2) @@ -1244,7 +1519,7 @@ mod tests { // modify catalog with examples { - let mut t = catalog.open_transaction(); + let mut t = catalog.open_transaction().await; // "real" modifications t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap(); @@ -1274,7 +1549,7 @@ mod tests { // uncommitted modifications have no effect { - let mut t = catalog.open_transaction(); + let mut t = catalog.open_transaction().await; t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap(); t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap(); @@ -1306,7 +1581,7 @@ mod tests { // load catalog from store and check replayed state let catalog = - PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string()) + PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string(), ()) .await .unwrap() .unwrap();