From 4a961694ecd8947094cc524d1c3aee9f06a4aecf Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 22 Jun 2021 09:50:50 +0200 Subject: [PATCH 1/4] refactor: make caller sync mem<>OS view during catalog transactions This is for #1740. Greatly simplifies the integration of the persisted catalog into the DB. --- parquet_file/src/catalog.rs | 609 ++++++++++++++++-------------------- parquet_file/src/cleanup.rs | 58 ++-- parquet_file/src/rebuild.rs | 84 +++-- server/src/db.rs | 346 ++++++-------------- 4 files changed, 448 insertions(+), 649 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index a913686e1c..c0f82156b7 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -185,6 +185,9 @@ pub enum Error { expected: proto::transaction::Encoding, actual: proto::transaction::Encoding, }, + + #[snafu(display("Cannot commit transaction: {}", source))] + TransactionCommitError { source: CommitError }, } pub type Result = std::result::Result; @@ -199,20 +202,6 @@ pub struct CatalogParquetInfo { pub metadata: Arc, } -/// How a transaction ends. -#[derive(Clone, Copy, Debug)] -pub enum TransactionEnd { - /// Successful commit. - /// - /// All buffered/prepared actions must be materialized. - Commit, - - /// Abort. - /// - /// All eagerly applied action must be rolled back. - Abort, -} - /// Abstraction over how the in-memory state of the catalog works. pub trait CatalogState { /// Input to create a new empty instance. @@ -223,37 +212,11 @@ pub trait CatalogState { /// Create empty state w/o any known files. fn new_empty(db_name: &str, data: Self::EmptyInput) -> Self; - /// Helper struct that can be used to remember/cache actions that happened during a transaction. - /// - /// This can be useful to implement one of the following systems to handle commits and aborts: - /// - **copy semantic:** the entire inner state is copied during transaction start and is committed at the end. This - /// is esp. useful when state-copies are simple - /// - **action preperation:** Actions are prepared during the transaction, checked for validity, and are simply - /// executed on commit. - /// - **action rollback:** Actions are eagerly executed during transaction and are rolled back when the transaction - /// aborts. - /// - /// This type is created by [`transaction_begin`](Self::transaction_begin), will be modified by the actions, and - /// will be consumed by [`transaction_end`](Self::transaction_end). - type TransactionState: Send + Sync; - - /// Hook that will be called at transaction start. - fn transaction_begin(origin: &Arc) -> Self::TransactionState; - - /// Hook that will be called at transaction end. - /// - /// Note that this hook will be called for both successful commits and aborts. - fn transaction_end(tstate: Self::TransactionState, how: TransactionEnd) -> Arc; - /// Add parquet file to state. - fn add( - tstate: &mut Self::TransactionState, - object_store: Arc, - info: CatalogParquetInfo, - ) -> Result<()>; + fn add(&mut self, object_store: Arc, info: CatalogParquetInfo) -> Result<()>; /// Remove parquet file from state. - fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()>; + fn remove(&mut self, path: DirsAndFileName) -> Result<()>; } /// In-memory view of the preserved catalog. @@ -373,8 +336,11 @@ impl PreservedCatalog { }; // add empty transaction - let transaction = catalog.open_transaction(state).await; - let state = transaction.commit(None).await?; + let transaction = catalog.open_transaction().await; + transaction + .commit(None) + .await + .context(TransactionCommitError)?; Ok((catalog, state)) } @@ -446,7 +412,7 @@ impl PreservedCatalog { } // setup empty state - let mut state = Arc::new(S::new_empty(&db_name, state_data)); + let mut state = S::new_empty(&db_name, state_data); let mut last_tkey = None; // detect replay start @@ -469,12 +435,12 @@ impl PreservedCatalog { } else { FileType::Transaction }; - state = OpenTransaction::load_and_apply( + OpenTransaction::load_and_apply( &object_store, server_id, &db_name, &tkey, - &state, + &mut state, &last_tkey, file_type, ) @@ -490,7 +456,7 @@ impl PreservedCatalog { server_id, db_name, }, - state, + Arc::new(state), ))) } @@ -500,24 +466,14 @@ impl PreservedCatalog { /// transaction handle is dropped. The newly created transaction will contain the state after `await` (esp. /// post-blocking). This system is fair, which means that transactions are given out in the order they were /// requested. - pub async fn open_transaction(&self, state: Arc) -> TransactionHandle<'_, S> - where - S: CatalogState + Send + Sync, - { - self.open_transaction_with_uuid(Uuid::new_v4(), state).await + pub async fn open_transaction(&self) -> TransactionHandle<'_> { + self.open_transaction_with_uuid(Uuid::new_v4()).await } /// Crate-private API to open an transaction with a specified UUID. Should only be used for catalog rebuilding or /// with a fresh V4-UUID! - pub(crate) async fn open_transaction_with_uuid( - &self, - uuid: Uuid, - state: Arc, - ) -> TransactionHandle<'_, S> - where - S: CatalogState + Send + Sync, - { - TransactionHandle::new(self, uuid, state).await + pub(crate) async fn open_transaction_with_uuid(&self, uuid: Uuid) -> TransactionHandle<'_> { + TransactionHandle::new(self, uuid).await } /// Get latest revision counter. @@ -805,27 +761,19 @@ impl Display for TransactionKey { } /// Tracker for an open, uncommitted transaction. -struct OpenTransaction -where - S: CatalogState + Send + Sync, -{ - tstate: S::TransactionState, +struct OpenTransaction { proto: proto::Transaction, } -impl OpenTransaction -where - S: CatalogState + Send + Sync, -{ +impl OpenTransaction { /// Private API to create new transaction, users should always use [`PreservedCatalog::open_transaction`]. - fn new(previous_tkey: &Option, uuid: Uuid, state: Arc) -> Self { + fn new(previous_tkey: &Option, uuid: Uuid) -> Self { let (revision_counter, previous_uuid) = match previous_tkey { Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()), None => (0, String::new()), }; Self { - tstate: S::transaction_begin(&state), proto: proto::Transaction { actions: vec![], version: TRANSACTION_VERSION, @@ -852,11 +800,14 @@ where /// Note that this method is primarily for replaying transactions and will NOT append the given action to the /// current transaction. If you also want to store the given action (e.g. during an in-progress transaction), use /// [`handle_action_and_record`](Self::handle_action_and_record). - fn handle_action( - tstate: &mut S::TransactionState, + fn handle_action( + state: &mut S, action: &proto::transaction::action::Action, object_store: &Arc, - ) -> Result<()> { + ) -> Result<()> + where + S: CatalogState, + { match action { proto::transaction::action::Action::Upgrade(u) => { UnsupportedUpgrade { @@ -871,49 +822,35 @@ where IoxParquetMetaData::from_thrift(&a.metadata).context(MetadataDecodingFailed)?; let metadata = Arc::new(metadata); - S::add( - tstate, + state.add( Arc::clone(object_store), CatalogParquetInfo { path, metadata }, )?; } proto::transaction::action::Action::RemoveParquet(a) => { let path = parse_dirs_and_filename(&a.path)?; - S::remove(tstate, path)?; + state.remove(path)?; } }; Ok(()) } - /// Similar to [`handle_action`](Self::handle_action) but this will also append the action to the current - /// transaction state. - fn handle_action_and_record( - &mut self, - action: proto::transaction::action::Action, - object_store: &Arc, - ) -> Result<()> { - Self::handle_action(&mut self.tstate, &action, object_store)?; + /// Record action to protobuf. + fn record_action(&mut self, action: proto::transaction::action::Action) { self.proto.actions.push(proto::transaction::Action { action: Some(action), }); - Ok(()) } /// Commit to mutable catalog and return previous transaction key. - fn commit( - self, - previous_tkey: &mut Option, - ) -> (Option, Arc) { + fn commit(self, previous_tkey: &mut Option) -> Option { let mut tkey = Some(self.tkey()); - let state = S::transaction_end(self.tstate, TransactionEnd::Commit); std::mem::swap(previous_tkey, &mut tkey); - (tkey, state) + tkey } /// Abort transaction - fn abort(self) -> Arc { - S::transaction_end(self.tstate, TransactionEnd::Abort) - } + fn abort(self) {} async fn store( &self, @@ -932,15 +869,18 @@ where Ok(()) } - async fn load_and_apply( + async fn load_and_apply( object_store: &Arc, server_id: ServerId, db_name: &str, tkey: &TransactionKey, - state: &Arc, + state: &mut S, last_tkey: &Option, file_type: FileType, - ) -> Result> { + ) -> Result<()> + where + S: CatalogState + Send, + { // recover state from store let path = file_path(object_store, server_id, db_name, tkey, file_type); let proto = load_transaction_proto(object_store, &path).await?; @@ -995,20 +935,14 @@ where }); } - // start transaction - let mut tstate = S::transaction_begin(state); - // apply for action in &proto.actions { if let Some(action) = action.action.as_ref() { - Self::handle_action(&mut tstate, action, object_store)?; + Self::handle_action(state, action, object_store)?; } } - // commit - let state = S::transaction_end(tstate, TransactionEnd::Commit); - - Ok(state) + Ok(()) } } @@ -1024,13 +958,19 @@ pub struct CheckpointData { pub files: HashMap>, } +#[derive(Debug, Snafu)] +pub enum CommitError { + #[snafu(display("Cannot write transaction: {}", source))] + CommitFailed { source: Box }, + + #[snafu(display("Cannot write checkpoint (transaction was written!): {}", source))] + CheckpointFailed { source: Box }, +} + /// Handle for an open uncommitted transaction. /// /// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning. -pub struct TransactionHandle<'c, S> -where - S: CatalogState + Send + Sync, -{ +pub struct TransactionHandle<'c> { catalog: &'c PreservedCatalog, // NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we @@ -1038,18 +978,11 @@ where #[allow(dead_code)] permit: SemaphorePermit<'c>, - transaction: Option>, + transaction: Option, } -impl<'c, S> TransactionHandle<'c, S> -where - S: CatalogState + Send + Sync, -{ - async fn new( - catalog: &'c PreservedCatalog, - uuid: Uuid, - state: Arc, - ) -> TransactionHandle<'c, S> { +impl<'c> TransactionHandle<'c> { + async fn new(catalog: &'c PreservedCatalog, uuid: Uuid) -> TransactionHandle<'c> { // first acquire semaphore (which is only being used for transactions), then get state lock let permit = catalog .transaction_semaphore @@ -1058,7 +991,7 @@ where .expect("semaphore should not be closed"); let previous_tkey_guard = catalog.previous_tkey.write(); - let transaction = OpenTransaction::new(&previous_tkey_guard, uuid, state); + let transaction = OpenTransaction::new(&previous_tkey_guard, uuid); // free state for readers again drop(previous_tkey_guard); @@ -1073,15 +1006,6 @@ where } } - /// Get current transaction state. - pub fn tstate(&self) -> &S::TransactionState { - &self - .transaction - .as_ref() - .expect("No transaction in progress?") - .tstate - } - /// Get revision counter for this transaction. pub fn revision_counter(&self) -> u64 { self.transaction @@ -1110,7 +1034,18 @@ where /// optimization to speed up transaction replay and allow to prune the history. /// /// Note that `checkpoint_data` must contain the state INCLUDING the to-be-commited transaction. - pub async fn commit(mut self, checkpoint_data: Option) -> Result> { + /// + /// # Failure Handling + /// When this function returns with an error, the caller MUST react accordingly: + /// + /// - [`CommitError::CommitFailed`]: It MUST be assumed that the commit has failed and all actions recorded + /// with this handle are NOT preserved. + /// - [`CommitError::CheckpointFailed`]: The actual transaction was written. Just the checkpoint was not + /// written. It MUST be assumed that all action recorded with this handle are preserved. + pub async fn commit( + mut self, + checkpoint_data: Option, + ) -> std::result::Result<(), CommitError> { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); let tkey = t.tkey(); @@ -1126,7 +1061,7 @@ where { Ok(()) => { // commit to catalog - let (previous_tkey, state) = self.commit_inner(t); + let previous_tkey = self.commit_inner(t); info!(?tkey, "transaction committed"); // maybe create a checkpoint @@ -1136,15 +1071,19 @@ where // NOTE: `inner_guard` will not be re-used here since it is a strong write lock and the checkpoint creation // only needs a read lock. self.create_checkpoint(tkey, previous_tkey, checkpoint_data) - .await?; + .await + .map_err(Box::new) + .context(CheckpointFailed)?; } - Ok(state) + Ok(()) } Err(e) => { warn!(?tkey, "failure while writing transaction, aborting"); t.abort(); - Err(e) + Err(CommitError::CommitFailed { + source: Box::new(e), + }) } } } @@ -1156,7 +1095,7 @@ where /// - the read-write guard for the inner catalog state should be limited in scope to avoid long write-locks /// - rustc seems to fold the guard into the async generator state even when we `drop` it quickly, making the /// resulting future `!Send`. However tokio requires our futures to be `Send`. - fn commit_inner(&self, t: OpenTransaction) -> (Option, Arc) { + fn commit_inner(&self, t: OpenTransaction) -> Option { let mut previous_tkey_guard = self.catalog.previous_tkey.write(); t.commit(&mut previous_tkey_guard) } @@ -1217,7 +1156,7 @@ where } /// Abort transaction w/o commit. - pub fn abort(mut self) -> Arc { + pub fn abort(mut self) { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); t.abort() @@ -1234,13 +1173,14 @@ where self.transaction .as_mut() .expect("transaction handle w/o transaction?!") - .handle_action_and_record( - proto::transaction::action::Action::AddParquet(proto::AddParquet { + .record_action(proto::transaction::action::Action::AddParquet( + proto::AddParquet { path: Some(unparse_dirs_and_filename(path)), metadata: metadata.to_thrift().context(MetadataEncodingFailed)?, - }), - &self.catalog.object_store, - ) + }, + )); + + Ok(()) } /// Remove a parquet file from the catalog. @@ -1250,19 +1190,17 @@ where self.transaction .as_mut() .expect("transaction handle w/o transaction?!") - .handle_action_and_record( - proto::transaction::action::Action::RemoveParquet(proto::RemoveParquet { + .record_action(proto::transaction::action::Action::RemoveParquet( + proto::RemoveParquet { path: Some(unparse_dirs_and_filename(path)), - }), - &self.catalog.object_store, - ) + }, + )); + + Ok(()) } } -impl<'c, S> Debug for TransactionHandle<'c, S> -where - S: CatalogState + Send + Sync, -{ +impl<'c> Debug for TransactionHandle<'c> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.transaction { Some(t) => write!(f, "TransactionHandle(open, {})", t.tkey()), @@ -1271,10 +1209,7 @@ where } } -impl<'c, S> Drop for TransactionHandle<'c, S> -where - S: CatalogState + Send + Sync, -{ +impl<'c> Drop for TransactionHandle<'c> { fn drop(&mut self) { if let Some(t) = self.transaction.take() { warn!(?self, "dropped uncommitted transaction, calling abort"); @@ -1289,7 +1224,7 @@ pub mod test_helpers { use crate::test_utils::{chunk_addr, make_metadata, make_object_store}; use super::*; - use std::{convert::TryFrom, ops::Deref}; + use std::convert::TryFrom; /// In-memory catalog state, for testing. #[derive(Clone, Debug)] @@ -1307,12 +1242,6 @@ pub mod test_helpers { } } - #[derive(Debug)] - pub struct TState { - pub old: Arc, - pub new: TestCatalogState, - } - impl CatalogState for TestCatalogState { type EmptyInput = (); @@ -1322,28 +1251,8 @@ pub mod test_helpers { } } - type TransactionState = TState; - - fn transaction_begin(origin: &Arc) -> Self::TransactionState { - Self::TransactionState { - old: Arc::clone(origin), - new: origin.deref().clone(), - } - } - - fn transaction_end(tstate: Self::TransactionState, how: TransactionEnd) -> Arc { - match how { - TransactionEnd::Abort => tstate.old, - TransactionEnd::Commit => Arc::new(tstate.new), - } - } - - fn add( - tstate: &mut Self::TransactionState, - _object_store: Arc, - info: CatalogParquetInfo, - ) -> Result<()> { - match tstate.new.parquet_files.entry(info.path) { + fn add(&mut self, _object_store: Arc, info: CatalogParquetInfo) -> Result<()> { + match self.parquet_files.entry(info.path) { Occupied(o) => { return Err(Error::ParquetFileAlreadyExists { path: o.key().clone(), @@ -1357,8 +1266,8 @@ pub mod test_helpers { Ok(()) } - fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()> { - match tstate.new.parquet_files.entry(path) { + fn remove(&mut self, path: DirsAndFileName) -> Result<()> { + match self.parquet_files.entry(path) { Occupied(o) => { o.remove(); } @@ -1404,12 +1313,12 @@ pub mod test_helpers { /// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided. pub async fn assert_catalog_state_implementation(state_data: S::EmptyInput, f: F) where - S: CatalogState + Send + Sync, + S: CatalogState + Debug + Send + Sync, F: Fn(&S) -> CheckpointData + Send, { // empty state let object_store = make_object_store(); - let (catalog, mut state) = PreservedCatalog::new_empty::( + let (_catalog, state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), ServerId::try_from(1).unwrap(), "db1".to_string(), @@ -1417,199 +1326,206 @@ pub mod test_helpers { ) .await .unwrap(); + let mut state = Arc::try_unwrap(state).unwrap(); let mut expected = HashMap::new(); - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // add files let mut chunk_id_watermark = 5; { - let mut transaction = catalog.open_transaction(state).await; - for chunk_id in 0..chunk_id_watermark { let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref()); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(chunk_id)).await; - transaction.add_parquet(&path, &metadata).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .unwrap(); expected.insert(path, Arc::new(metadata)); } - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // remove files { - let mut transaction = catalog.open_transaction(state).await; - let path = parsed_path!("chunk_1"); - transaction.remove_parquet(&path).unwrap(); + state.remove(path.clone()).unwrap(); expected.remove(&path); - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // add and remove in the same transaction { - let mut transaction = catalog.open_transaction(state).await; - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(chunk_id_watermark)).await; - transaction.add_parquet(&path, &metadata).unwrap(); - transaction.remove_parquet(&path).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata), + }, + ) + .unwrap(); + state.remove(path.clone()).unwrap(); chunk_id_watermark += 1; - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // remove and add in the same transaction { - let mut transaction = catalog.open_transaction(state).await; - let path = parsed_path!("chunk_2"); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; - transaction.remove_parquet(&path).unwrap(); - transaction.add_parquet(&path, &metadata).unwrap(); - - state = transaction.commit(None).await.unwrap(); + state.remove(path.clone()).unwrap(); + state + .add( + Arc::clone(&&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata), + }, + ) + .unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // add, remove, add in the same transaction { - let mut transaction = catalog.open_transaction(state).await; - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(chunk_id_watermark)).await; - transaction.add_parquet(&path, &metadata).unwrap(); - transaction.remove_parquet(&path).unwrap(); - transaction.add_parquet(&path, &metadata).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .unwrap(); + state.remove(path.clone()).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .unwrap(); expected.insert(path, Arc::new(metadata)); chunk_id_watermark += 1; - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // remove, add, remove in same transaction { - let mut transaction = catalog.open_transaction(state).await; - let path = parsed_path!("chunk_2"); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; - transaction.remove_parquet(&path).unwrap(); - transaction.add_parquet(&path, &metadata).unwrap(); - transaction.remove_parquet(&path).unwrap(); + state.remove(path.clone()).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata), + }, + ) + .unwrap(); + state.remove(path.clone()).unwrap(); expected.remove(&path); - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // error handling, no real opt { - let mut transaction = catalog.open_transaction(state).await; - // already exists (should also not change the metadata) let path = parsed_path!("chunk_0"); let (_, metadata) = make_metadata(&object_store, "fail", chunk_addr(0)).await; - let err = transaction.add_parquet(&path, &metadata).unwrap_err(); + let err = state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata), + }, + ) + .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // does not exist let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let err = transaction.remove_parquet(&path).unwrap_err(); + let err = state.remove(path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); chunk_id_watermark += 1; - - state = transaction.commit(None).await.unwrap(); } - assert_checkpoint(state.as_ref(), &f, &expected); + assert_checkpoint(&state, &f, &expected); // error handling, still something works { - let mut transaction = catalog.open_transaction(state).await; - // already exists (should also not change the metadata) let path = parsed_path!("chunk_0"); let (_, metadata) = make_metadata(&object_store, "fail", chunk_addr(0)).await; - let err = transaction.add_parquet(&path, &metadata).unwrap_err(); + let err = state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // this transaction will still work let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(chunk_id_watermark)).await; - transaction.add_parquet(&path, &metadata).unwrap(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .unwrap(); expected.insert(path.clone(), Arc::new(metadata.clone())); chunk_id_watermark += 1; // recently added - let err = transaction.add_parquet(&path, &metadata).unwrap_err(); + let err = state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata), + }, + ) + .unwrap_err(); assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); // does not exist let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let err = transaction.remove_parquet(&path).unwrap_err(); + let err = state.remove(path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); chunk_id_watermark += 1; // this still works let path = parsed_path!("chunk_3"); - transaction.remove_parquet(&path).unwrap(); + state.remove(path.clone()).unwrap(); expected.remove(&path); // recently removed - let err = transaction.remove_parquet(&path).unwrap_err(); + let err = state.remove(path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - - state = transaction.commit(None).await.unwrap(); - } - assert_checkpoint(state.as_ref(), &f, &expected); - - // transaction aborting - { - let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; - - // add - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let (_, metadata) = - make_metadata(&object_store, "ok", chunk_addr(chunk_id_watermark)).await; - transaction.add_parquet(&path, &metadata).unwrap(); - chunk_id_watermark += 1; - - // remove - let path = parsed_path!("chunk_4"); - transaction.remove_parquet(&path).unwrap(); - - // add and remove - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let (_, metadata) = - make_metadata(&object_store, "ok", chunk_addr(chunk_id_watermark)).await; - transaction.add_parquet(&path, &metadata).unwrap(); - transaction.remove_parquet(&path).unwrap(); - chunk_id_watermark += 1; - } - assert_checkpoint(state.as_ref(), &f, &expected); - - // transaction aborting w/ errors - { - let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; - - // already exists (should also not change the metadata) - let path = parsed_path!("chunk_0"); - let (_, metadata) = make_metadata(&object_store, "fail", chunk_addr(0)).await; - let err = transaction.add_parquet(&path, &metadata).unwrap_err(); - assert!(matches!(err, Error::ParquetFileAlreadyExists { .. })); - - // does not exist - let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); - let err = transaction.remove_parquet(&path).unwrap_err(); - assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - chunk_id_watermark += 1; } + assert_checkpoint(&state, &f, &expected); // consume variable so that we can easily add tests w/o re-adding the final modification println!("{}", chunk_id_watermark); @@ -2165,7 +2081,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_debug() { let object_store = make_object_store(); - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( object_store, make_server_id(), "db1".to_string(), @@ -2173,7 +2089,7 @@ mod tests { ) .await .unwrap(); - let mut t = catalog.open_transaction(state).await; + let mut t = catalog.open_transaction().await; // open transaction t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); @@ -2574,7 +2490,7 @@ mod tests { let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; // re-open catalog - let (catalog, mut state) = PreservedCatalog::load::( + let (catalog, state) = PreservedCatalog::load::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2583,23 +2499,26 @@ mod tests { .await .unwrap() .unwrap(); + let mut state = Arc::try_unwrap(state).unwrap(); // create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading) { - let transaction = catalog.open_transaction(state).await; - let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); - state = transaction.commit(checkpoint_data).await.unwrap(); + let transaction = catalog.open_transaction().await; + let checkpoint_data = Some(state.checkpoint_data()); + transaction.commit(checkpoint_data).await.unwrap(); } trace.record(&catalog, &state, false); // create another transaction on-top that adds a file (this transaction will be required to load the full state) { - let mut transaction = catalog.open_transaction(state).await; - transaction - .add_parquet(&parsed_path!("last_one"), &metadata) - .unwrap(); - let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); - state = transaction.commit(checkpoint_data).await.unwrap(); + let mut transaction = catalog.open_transaction().await; + let path = parsed_path!("last_one"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata.clone())); + transaction.add_parquet(&path, &metadata).unwrap(); + let checkpoint_data = Some(state.checkpoint_data()); + transaction.commit(checkpoint_data).await.unwrap(); } trace.record(&catalog, &state, false); @@ -2751,7 +2670,7 @@ mod tests { server_id: ServerId, db_name: &str, ) -> TestTrace { - let (catalog, mut state) = PreservedCatalog::new_empty( + let (catalog, state) = PreservedCatalog::new_empty( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2759,6 +2678,7 @@ mod tests { ) .await .unwrap(); + let mut state = Arc::try_unwrap(state).unwrap(); // get some test metadata let (_, metadata1) = make_metadata(object_store, "foo", chunk_addr(1)).await; @@ -2774,17 +2694,33 @@ mod tests { // fill catalog with examples { - let mut t = catalog.open_transaction(state).await; + let mut t = catalog.open_transaction().await; - t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap(); - t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2) - .unwrap(); - t.add_parquet(&parsed_path!(["sub1"], "test2"), &metadata2) - .unwrap(); - t.add_parquet(&parsed_path!(["sub2"], "test1"), &metadata1) - .unwrap(); + let path = parsed_path!("test1"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata1.clone())); + t.add_parquet(&path, &metadata1).unwrap(); - state = t.commit(None).await.unwrap(); + let path = parsed_path!(["sub1"], "test1"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata2.clone())); + t.add_parquet(&path, &metadata2).unwrap(); + + let path = parsed_path!(["sub1"], "test2"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata2.clone())); + t.add_parquet(&path, &metadata2).unwrap(); + + let path = parsed_path!(["sub2"], "test1"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata1.clone())); + t.add_parquet(&path, &metadata1).unwrap(); + + t.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 1); assert_catalog_parquet_files( @@ -2800,21 +2736,20 @@ mod tests { // modify catalog with examples { - let mut t = catalog.open_transaction(state).await; + let mut t = catalog.open_transaction().await; // "real" modifications - t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap(); - t.remove_parquet(&parsed_path!("test1")).unwrap(); + let path = parsed_path!("test4"); + state + .parquet_files + .insert(path.clone(), Arc::new(metadata1.clone())); + t.add_parquet(&path, &metadata1).unwrap(); - // wrong modifications - t.add_parquet(&parsed_path!(["sub1"], "test2"), &metadata2) - .expect_err("add file twice should error"); - t.remove_parquet(&parsed_path!("does_not_exist")) - .expect_err("removing unknown file should error"); - t.remove_parquet(&parsed_path!("test1")) - .expect_err("removing twice should error"); + let path = parsed_path!("test1"); + state.parquet_files.remove(&path); + t.remove_parquet(&path).unwrap(); - state = t.commit(None).await.unwrap(); + t.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( @@ -2830,7 +2765,7 @@ mod tests { // uncommitted modifications have no effect { - let mut t = catalog.open_transaction(Arc::clone(&state)).await; + let mut t = catalog.open_transaction().await; t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap(); t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap(); @@ -3019,7 +2954,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_revision_counter() { let object_store = make_object_store(); - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( object_store, make_server_id(), "db1".to_string(), @@ -3027,7 +2962,7 @@ mod tests { ) .await .unwrap(); - let t = catalog.open_transaction(state).await; + let t = catalog.open_transaction().await; assert_eq!(t.revision_counter(), 1); } @@ -3035,7 +2970,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_uuid() { let object_store = make_object_store(); - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( object_store, make_server_id(), "db1".to_string(), @@ -3043,7 +2978,7 @@ mod tests { ) .await .unwrap(); - let mut t = catalog.open_transaction(state).await; + let mut t = catalog.open_transaction().await; t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); assert_eq!(t.uuid(), Uuid::nil()); @@ -3205,7 +3140,7 @@ mod tests { let db_name = "db1"; let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; - let (catalog, mut state) = PreservedCatalog::load::( + let (catalog, state) = PreservedCatalog::load::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -3217,9 +3152,9 @@ mod tests { // create empty transaction w/ checkpoint { - let transaction = catalog.open_transaction(state).await; - let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); - state = transaction.commit(checkpoint_data).await.unwrap(); + let transaction = catalog.open_transaction().await; + let checkpoint_data = Some(state.checkpoint_data()); + transaction.commit(checkpoint_data).await.unwrap(); } trace.record(&catalog, &state, false); @@ -3322,8 +3257,8 @@ mod tests { // create empty transaction w/ checkpoint { - let transaction = catalog.open_transaction(state).await; - let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); + let transaction = catalog.open_transaction().await; + let checkpoint_data = Some(state.checkpoint_data()); transaction.commit(checkpoint_data).await.unwrap(); } diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 2ac4c31ce8..66e02aca52 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -6,7 +6,7 @@ use std::{ }; use crate::{ - catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog, TransactionEnd}, + catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog}, storage::data_location, }; use futures::TryStreamExt; @@ -39,17 +39,13 @@ pub type Result = std::result::Result; /// /// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held /// use `max_files` which will limit the number of files to delete in this cleanup round. -pub async fn cleanup_unreferenced_parquet_files( +pub async fn cleanup_unreferenced_parquet_files( catalog: &PreservedCatalog, - state: Arc, max_files: usize, -) -> Result<()> -where - S: CatalogState + Send + Sync, -{ +) -> Result<()> { // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there // that are about to get added to the catalog. - let transaction = catalog.open_transaction(state).await; + let transaction = catalog.open_transaction().await; let store = catalog.object_store(); let server_id = catalog.server_id(); @@ -130,35 +126,19 @@ impl CatalogState for TracerCatalogState { } } - type TransactionState = Arc; - - fn transaction_begin(origin: &Arc) -> Self::TransactionState { - // no copy - Arc::clone(origin) - } - - fn transaction_end(tstate: Self::TransactionState, _how: TransactionEnd) -> Arc { - // we don't care about aborts because they are not during clean-up - tstate - } - fn add( - tstate: &mut Self::TransactionState, + &mut self, _object_store: Arc, info: CatalogParquetInfo, ) -> crate::catalog::Result<()> { - tstate - .files + self.files .lock() .expect("lock poissened?") .insert(info.path); Ok(()) } - fn remove( - _tstate: &mut Self::TransactionState, - _path: DirsAndFileName, - ) -> crate::catalog::Result<()> { + fn remove(&mut self, _path: DirsAndFileName) -> crate::catalog::Result<()> { // Do NOT remove the file since we still need it for time travel Ok(()) } @@ -184,7 +164,7 @@ mod tests { let server_id = make_server_id(); let db_name = "db1"; - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -194,7 +174,7 @@ mod tests { .unwrap(); // run clean-up - cleanup_unreferenced_parquet_files(&catalog, state, 1_000) + cleanup_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); } @@ -205,7 +185,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let (catalog, mut state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -218,7 +198,7 @@ mod tests { let mut paths_keep = vec![]; let mut paths_delete = vec![]; { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; // an ordinary tracked parquet file => keep let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await; @@ -242,11 +222,11 @@ mod tests { let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await; paths_delete.push(path.display()); - state = transaction.commit(None).await.unwrap(); + transaction.commit(None).await.unwrap(); } // run clean-up - cleanup_unreferenced_parquet_files(&catalog, state, 1_000) + cleanup_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); @@ -266,7 +246,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -279,7 +259,7 @@ mod tests { for i in 0..100 { let (path, _) = tokio::join!( async { - let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); @@ -289,7 +269,7 @@ mod tests { path.display() }, async { - cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 1_000) + cleanup_unreferenced_parquet_files(&catalog, 1_000) .await .unwrap(); }, @@ -306,7 +286,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -323,7 +303,7 @@ mod tests { } // run clean-up - cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) + cleanup_unreferenced_parquet_files(&catalog, 2) .await .unwrap(); @@ -333,7 +313,7 @@ mod tests { assert_eq!(leftover.len(), 1); // run clean-up again - cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) + cleanup_unreferenced_parquet_files(&catalog, 2) .await .unwrap(); diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index 25103c9af1..50af2cbf25 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -1,6 +1,7 @@ //! Contains code to rebuild a catalog from files. use std::{ collections::{hash_map::Entry, HashMap}, + fmt::Debug, sync::Arc, }; @@ -15,7 +16,7 @@ use snafu::{ResultExt, Snafu}; use uuid::Uuid; use crate::{ - catalog::{CatalogState, CheckpointData, PreservedCatalog}, + catalog::{CatalogParquetInfo, CatalogState, CheckpointData, PreservedCatalog}, metadata::{IoxMetadata, IoxParquetMetaData}, }; #[derive(Debug, Snafu)] @@ -54,7 +55,7 @@ pub enum Error { FileRecordFailure { source: crate::catalog::Error }, #[snafu(display("Cannot commit transaction: {}", source))] - CommitFailure { source: crate::catalog::Error }, + CommitFailure { source: crate::catalog::CommitError }, } pub type Result = std::result::Result; @@ -98,17 +99,22 @@ pub async fn rebuild_catalog( ignore_metadata_read_failure: bool, ) -> Result<(PreservedCatalog, Arc)> where - S: CatalogState + Send + Sync, + S: CatalogState + Debug + Send + Sync, { // collect all revisions from parquet files let mut revisions = collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?; // create new empty catalog - let (catalog, mut state) = - PreservedCatalog::new_empty::(object_store, server_id, db_name, catalog_empty_input) - .await - .context(NewEmptyFailure)?; + let (catalog, state) = PreservedCatalog::new_empty::( + Arc::clone(&object_store), + server_id, + db_name, + catalog_empty_input, + ) + .await + .context(NewEmptyFailure)?; + let mut state = Arc::try_unwrap(state).expect("dangling Arc?"); // trace all files for final checkpoint let mut collected_files = HashMap::new(); @@ -124,10 +130,19 @@ where if let Some((uuid, entries)) = revisions.remove(&revision_counter) { // we have files for this particular transaction - let mut transaction = catalog.open_transaction_with_uuid(uuid, state).await; + let mut transaction = catalog.open_transaction_with_uuid(uuid).await; for (path, metadata) in entries { let path: DirsAndFileName = path.clone().into(); + state + .add( + Arc::clone(&object_store), + CatalogParquetInfo { + path: path.clone(), + metadata: Arc::new(metadata.clone()), + }, + ) + .context(FileRecordFailure)?; transaction .add_parquet(&path, &metadata) .context(FileRecordFailure)?; @@ -137,7 +152,7 @@ where let checkpoint_data = (revision_counter == max_revision).then(|| CheckpointData { files: collected_files.clone(), }); - state = transaction + transaction .commit(checkpoint_data) .await .context(CommitFailure)?; @@ -145,13 +160,13 @@ where // we do not have any files for this transaction (there might have been other actions though or it was // an empty transaction) => create new empty transaction // Note that this can never be the last transaction, so we don't need to create a checkpoint here. - let transaction = catalog.open_transaction(state).await; - state = transaction.commit(None).await.context(CommitFailure)?; + let transaction = catalog.open_transaction().await; + transaction.commit(None).await.context(CommitFailure)?; } } } - Ok((catalog, state)) + Ok((catalog, Arc::new(state))) } /// Collect all files under the given locations. @@ -278,7 +293,7 @@ mod tests { let db_name = "db1"; // build catalog with some data - let (catalog, mut state) = PreservedCatalog::new_empty::( + let (catalog, state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -286,8 +301,9 @@ mod tests { ) .await .unwrap(); + let mut state = Arc::try_unwrap(state).unwrap(); { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = create_parquet_file( &object_store, @@ -298,6 +314,9 @@ mod tests { 0, ) .await; + state + .parquet_files + .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); let (path, md) = create_parquet_file( @@ -309,17 +328,20 @@ mod tests { 1, ) .await; + state + .parquet_files + .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - state = transaction.commit(None).await.unwrap(); + transaction.commit(None).await.unwrap(); } { // empty transaction - let transaction = catalog.open_transaction(state).await; - state = transaction.commit(None).await.unwrap(); + let transaction = catalog.open_transaction().await; + transaction.commit(None).await.unwrap(); } { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = create_parquet_file( &object_store, @@ -330,9 +352,12 @@ mod tests { 2, ) .await; + state + .parquet_files + .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - state = transaction.commit(None).await.unwrap(); + transaction.commit(None).await.unwrap(); } // store catalog state @@ -459,7 +484,7 @@ mod tests { let db_name = "db1"; // build catalog with same data - let (catalog, state) = PreservedCatalog::new_empty::( + let (catalog, _state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -468,7 +493,7 @@ mod tests { .await .unwrap(); { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = create_parquet_file( &object_store, @@ -577,7 +602,7 @@ mod tests { let db_name = "db1"; // build catalog with some data (2 transactions + initial empty one) - let (catalog, mut state) = PreservedCatalog::new_empty::( + let (catalog, state) = PreservedCatalog::new_empty::( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -585,8 +610,9 @@ mod tests { ) .await .unwrap(); + let mut state = Arc::try_unwrap(state).unwrap(); { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = create_parquet_file( &object_store, @@ -597,12 +623,15 @@ mod tests { 0, ) .await; + state + .parquet_files + .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - state = transaction.commit(None).await.unwrap(); + transaction.commit(None).await.unwrap(); } { - let mut transaction = catalog.open_transaction(state).await; + let mut transaction = catalog.open_transaction().await; let (path, md) = create_parquet_file( &object_store, @@ -613,9 +642,12 @@ mod tests { 2, ) .await; + state + .parquet_files + .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - state = transaction.commit(None).await.unwrap(); + transaction.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); diff --git a/server/src/db.rs b/server/src/db.rs index b7c4327aac..7a90b4745d 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -12,6 +12,7 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef; use async_trait::async_trait; use catalog::{chunk::CatalogChunk, Catalog}; pub(crate) use chunk::DbChunk; +use data_types::chunk_metadata::ChunkAddr; use data_types::{ chunk_metadata::ChunkSummary, database_rules::DatabaseRules, @@ -30,7 +31,7 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use observability_deps::tracing::{debug, error, info, warn}; use parking_lot::RwLock; -use parquet_file::catalog::{CheckpointData, TransactionEnd}; +use parquet_file::catalog::CheckpointData; use parquet_file::{ catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog}, chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, @@ -42,7 +43,7 @@ use query::{exec::Executor, predicate::Predicate, QueryDatabase}; use rand_distr::{Distribution, Poisson}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use snafu::{ResultExt, Snafu}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::future::Future; use std::{ any::Any, @@ -143,11 +144,22 @@ pub enum Error { #[snafu(display("Error building sequenced entry: {}", source))] SequencedEntryError { source: entry::SequencedEntryError }, + #[snafu(display("Error while creating parquet chunk: {}", source))] + ParquetChunkError { source: parquet_file::chunk::Error }, + #[snafu(display("Error while handling transaction on preserved catalog: {}", source))] TransactionError { source: parquet_file::catalog::Error, }, + #[snafu(display("Error while commiting transaction on preserved catalog: {}", source))] + CommitError { + source: parquet_file::catalog::CommitError, + }, + + #[snafu(display("Cannot write chunk: {}", addr))] + CannotWriteChunk { addr: ChunkAddr }, + #[snafu(display("background task cancelled: {}", source))] TaskCancelled { source: futures::future::Aborted }, } @@ -595,9 +607,6 @@ impl Db { debug!(chunk=%guard.addr(), "chunk marked WRITING , loading tables into object store"); - // Drop locks - let chunk = guard.unwrap().chunk; - // Create a storage to save data of this chunk let storage = Storage::new( Arc::clone(&db.store), @@ -613,6 +622,10 @@ impl Db { let preserved_catalog = Arc::clone(&db.preserved_catalog); let catalog = Arc::clone(&db.catalog); + let object_store = Arc::clone(&db.store); + + // Drop locks + let chunk = guard.unwrap().chunk; let fut = async move { let table_name = table_summary.name.as_str(); @@ -632,9 +645,22 @@ impl Db { streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), ); + // check that the upcoming state change will very likely succeed + { + // re-lock + let guard = chunk.read(); + if matches!(guard.stage(), &ChunkStage::Persisted { .. }) + || !guard.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting) + { + return Err(Error::CannotWriteChunk { + addr: guard.addr().clone(), + }); + } + } + // catalog-level transaction for preservation layer { - let mut transaction = preserved_catalog.open_transaction(catalog).await; + let mut transaction = preserved_catalog.open_transaction().await; // Write this table data into the object store // @@ -659,6 +685,23 @@ impl Db { ) .await .context(WritingToObjectStore)?; + let parquet_metadata = Arc::new(parquet_metadata); + + let metrics = catalog + .metrics_registry + .register_domain_with_labels("parquet", catalog.metric_labels.clone()); + let metrics = + ParquetChunkMetrics::new(&metrics, catalog.metrics().memory().parquet()); + let parquet_chunk = Arc::new( + ParquetChunk::new( + path.clone(), + object_store, + Arc::clone(&parquet_metadata), + metrics, + ) + .context(ParquetChunkError)?, + ); + let path: DirsAndFileName = path.into(); transaction @@ -670,19 +713,35 @@ impl Db { transaction.revision_counter() % interval.get() == 0 }); let checkpoint_data = create_checkpoint.then(|| { - let mut checkpoint_data = - checkpoint_data_from_catalog(&transaction.tstate().catalog); + let mut checkpoint_data = checkpoint_data_from_catalog(&catalog); // don't forget the file that we've just added - checkpoint_data - .files - .insert(path, Arc::new(parquet_metadata)); + checkpoint_data.files.insert(path, parquet_metadata); checkpoint_data }); - transaction - .commit(checkpoint_data) - .await - .context(TransactionError)?; + match transaction.commit(checkpoint_data).await { + Ok(()) => { + let mut guard = chunk.write(); + if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { + panic!("Chunk written but cannot mark as written {}", e); + } + } + Err(e @ parquet_file::catalog::CommitError::CheckpointFailed { .. }) => { + warn!(%e, "cannot create catalog checkpoint"); + + // still mark chunk as persisted + let mut guard = chunk.write(); + if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { + panic!("Chunk written but cannot mark as written {}", e); + } + } + Err(e @ parquet_file::catalog::CommitError::CommitFailed { .. }) => { + warn!(%e, "cannot create catalog transaction"); + + // do NOT mark chunk as persisted + return Err(Error::CommitError { source: e }); + } + } } // We know this chunk is ParquetFile type @@ -804,7 +863,7 @@ impl Db { debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; - if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, Arc::clone(&self.catalog), 1_000).await { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await { error!(%e, "error in background cleanup task"); } } => {}, @@ -1016,47 +1075,6 @@ pub struct CatalogEmptyInput { metric_labels: Vec, } -#[derive(Debug)] -enum TransactionCommitAction { - DropChunk { - table_name: String, - partition_key: String, - chunk_id: u32, - }, - NewChunk { - table_name: String, - partition_key: String, - chunk_id: u32, - inner: Arc, - }, - SetWritten { - table_name: String, - partition_key: String, - chunk_id: u32, - inner: Arc, - }, -} - -/// Helper to manage transaction on the in-memory catalog. -#[derive(Debug)] -pub struct TransactionState { - /// Inner catalog used during this transaction. - catalog: Arc, - - /// Actions that will be performed on successful commit. These are pre-checked and should not result in any errors. - commit_actions: Vec, - - /// New files that are to be added during this transaction with table, partition key and chunk ID. - /// - /// This only contains files that were not (yet) removed during the same transaction. - new_files: HashMap, - - /// Files removed during this transaction. - /// - /// This only contains files that were not (yet) re-added during the same transaction. - removed_files: HashSet, -} - impl CatalogState for Catalog { type EmptyInput = CatalogEmptyInput; @@ -1069,89 +1087,8 @@ impl CatalogState for Catalog { ) } - type TransactionState = TransactionState; - - fn transaction_begin(origin: &Arc) -> Self::TransactionState { - TransactionState { - catalog: Arc::clone(origin), - commit_actions: vec![], - new_files: HashMap::new(), - removed_files: HashSet::new(), - } - } - - fn transaction_end(tstate: Self::TransactionState, how: TransactionEnd) -> Arc { - let TransactionState { - catalog, - commit_actions, - .. - } = tstate; - - if matches!(how, TransactionEnd::Commit) { - for action in commit_actions { - match action { - TransactionCommitAction::DropChunk { - table_name, - partition_key, - chunk_id, - } => { - // TODO: Should this really be infallible? - if let Ok(partition) = catalog.partition(&table_name, &partition_key) { - let mut partition = partition.write(); - let _ = partition.drop_chunk(chunk_id); - } - - debug!(%table_name, %partition_key, chunk_id, "removed chunk according to persisted catalog"); - } - TransactionCommitAction::NewChunk { - table_name, - partition_key, - chunk_id, - inner, - } => { - let partition = catalog - .get_or_create_partition(table_name.clone(), partition_key.clone()); - let mut partition = partition.write(); - partition.insert_object_store_only_chunk(chunk_id, inner); - debug!(%table_name, %partition_key, chunk_id, "recovered chunk from persisted catalog"); - } - TransactionCommitAction::SetWritten { - table_name, - partition_key, - chunk_id, - inner, - } => { - let partition = catalog - .get_or_create_partition(table_name.clone(), partition_key.clone()); - let partition = partition.read(); - - match partition.chunk(chunk_id) { - Some(chunk) => { - let mut chunk = chunk.write(); - - match chunk.set_written_to_object_store(inner) { - Ok(()) => { - debug!(%table_name, %partition_key, chunk_id, "chunk marked WRITTEN. Persisting to object store complete"); - } - Err(e) => { - warn!(%e, %table_name, %partition_key, chunk_id, "chunk state changed during transaction even though lifecycle action was present"); - } - } - } - None => { - warn!(%table_name, %partition_key, chunk_id, "chunk is gone during transaction even though lifecycle action was present"); - } - } - } - }; - } - } - - catalog - } - fn add( - tstate: &mut Self::TransactionState, + &mut self, object_store: Arc, info: CatalogParquetInfo, ) -> parquet_file::catalog::Result<()> { @@ -1166,13 +1103,11 @@ impl CatalogState for Catalog { })?; // Create a parquet chunk for this chunk - let metrics = tstate - .catalog + let metrics = self .metrics_registry - .register_domain_with_labels("parquet", tstate.catalog.metric_labels.clone()); + .register_domain_with_labels("parquet", self.metric_labels.clone()); - let metrics = - ParquetChunkMetrics::new(&metrics, tstate.catalog.metrics().memory().parquet()); + let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet()); let parquet_chunk = ParquetChunk::new( object_store.path_from_dirs_and_filename(info.path.clone()), object_store, @@ -1186,128 +1121,45 @@ impl CatalogState for Catalog { // Get partition from the catalog // Note that the partition might not exist yet if the chunk is loaded from an existing preserved catalog. - let partition = tstate - .catalog - .get_or_create_partition(&iox_md.table_name, &iox_md.partition_key); - let partition_guard = partition.read(); - - if tstate.new_files.contains_key(&info.path) { + let partition = self.get_or_create_partition(&iox_md.table_name, &iox_md.partition_key); + let mut partition = partition.write(); + if partition.chunk(iox_md.chunk_id).is_some() { return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { path: info.path }); } - - // Get the chunk from the catalog - match ( - tstate.removed_files.remove(&info.path), - partition_guard.chunk(iox_md.chunk_id), - ) { - (false, Some(chunk)) => { - // Chunk exists => should be in frozen stage and will transition from there - - // Relock the chunk again (nothing else should have been able - // to modify the chunk state while we were moving it - let chunk = chunk.read(); - - // check if chunk already exists - if matches!(chunk.stage(), &ChunkStage::Persisted { .. }) { - return Err(parquet_file::catalog::Error::ParquetFileAlreadyExists { - path: info.path, - }); - } - - // check that the upcoming state change will very likely succeed - if !chunk.is_in_lifecycle(::lifecycle::ChunkLifecycleAction::Persisting) { - return Err(parquet_file::catalog::Error::CatalogStateFailure { - source: Box::new( - crate::db::catalog::chunk::Error::UnexpectedLifecycleAction { - chunk: chunk.addr().clone(), - expected: "persisting".to_string(), - actual: chunk - .lifecycle_action() - .map_or("n/a", |action| action.metadata().name()) - .to_string(), - }, - ), - path: info.path, - }); - } - - // update the catalog to say we are done processing - tstate - .commit_actions - .push(TransactionCommitAction::SetWritten { - table_name: iox_md.table_name.clone(), - partition_key: iox_md.partition_key.clone(), - chunk_id: iox_md.chunk_id, - inner: parquet_chunk, - }); - tstate.new_files.insert( - info.path, - (iox_md.table_name, iox_md.partition_key, iox_md.chunk_id), - ); - } - _ => { - // table unknown => that's ok, create chunk in "object store only" stage which will also create the table - // table chunk, but table already known => that's ok, create chunk in "object store only" stage - tstate - .commit_actions - .push(TransactionCommitAction::NewChunk { - table_name: iox_md.table_name.clone(), - partition_key: iox_md.partition_key.clone(), - chunk_id: iox_md.chunk_id, - inner: parquet_chunk, - }); - tstate.new_files.insert( - info.path, - (iox_md.table_name, iox_md.partition_key, iox_md.chunk_id), - ); - } - } + partition.insert_object_store_only_chunk(iox_md.chunk_id, parquet_chunk); Ok(()) } - fn remove( - tstate: &mut Self::TransactionState, - path: DirsAndFileName, - ) -> parquet_file::catalog::Result<()> { - if tstate.removed_files.contains(&path) { - return Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path }); - } + fn remove(&mut self, path: DirsAndFileName) -> parquet_file::catalog::Result<()> { + let mut removed_any = false; - let mut actions: Vec = vec![]; - - for partition in tstate.catalog.partitions() { - let partition = partition.read(); + for partition in self.partitions() { + let mut partition = partition.write(); + let mut to_remove = vec![]; for chunk in partition.chunks() { let chunk = chunk.read(); if let ChunkStage::Persisted { parquet, .. } = chunk.stage() { let chunk_path: DirsAndFileName = parquet.path().into(); if path == chunk_path { - actions.push(TransactionCommitAction::DropChunk { - table_name: partition.table_name().to_string(), - partition_key: partition.key().to_string(), - chunk_id: chunk.id(), - }); + to_remove.push(chunk.id()); } } } + + for chunk_id in to_remove { + if let Err(e) = partition.drop_chunk(chunk_id) { + panic!("Chunk is gone while we've had a partition lock: {}", e); + } + removed_any = true; + } } - if let Some((table_name, partition_key, chunk_id)) = tstate.new_files.remove(&path) { - actions.push(TransactionCommitAction::DropChunk { - table_name, - partition_key, - chunk_id, - }); - } - - if actions.is_empty() { - Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path }) - } else { - tstate.commit_actions.append(&mut actions); - tstate.removed_files.insert(path); + if removed_any { Ok(()) + } else { + Err(parquet_file::catalog::Error::ParquetFileDoesNotExist { path }) } } } From d2be64186411fffdf8f6b019f3b3335f61743a29 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 22 Jun 2021 10:40:23 +0200 Subject: [PATCH 2/4] refactor: make checkpointing easier to use Don't mix commit+checkpoint in a single call so that the caller has to reason about the error type and which of the two operations has failed. Splitting it also makes it easier to create the correct checkpoint data. --- parquet_file/src/catalog.rs | 252 ++++++++++++++++++++---------------- parquet_file/src/cleanup.rs | 4 +- parquet_file/src/rebuild.rs | 35 ++--- server/src/db.rs | 53 ++++---- 4 files changed, 185 insertions(+), 159 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index c0f82156b7..f89ecb2c83 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -187,7 +187,7 @@ pub enum Error { }, #[snafu(display("Cannot commit transaction: {}", source))] - TransactionCommitError { source: CommitError }, + CommitError { source: Box }, } pub type Result = std::result::Result; @@ -338,9 +338,10 @@ impl PreservedCatalog { // add empty transaction let transaction = catalog.open_transaction().await; transaction - .commit(None) + .commit() .await - .context(TransactionCommitError)?; + .map_err(Box::new) + .context(CommitError)?; Ok((catalog, state)) } @@ -958,26 +959,14 @@ pub struct CheckpointData { pub files: HashMap>, } -#[derive(Debug, Snafu)] -pub enum CommitError { - #[snafu(display("Cannot write transaction: {}", source))] - CommitFailed { source: Box }, - - #[snafu(display("Cannot write checkpoint (transaction was written!): {}", source))] - CheckpointFailed { source: Box }, -} - /// Handle for an open uncommitted transaction. /// /// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning. pub struct TransactionHandle<'c> { catalog: &'c PreservedCatalog, - // NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we - // rely on. - #[allow(dead_code)] - permit: SemaphorePermit<'c>, - + // NOTE: The following two must be an option so we can `take` them during `Self::commit`. + permit: Option>, transaction: Option, } @@ -1002,7 +991,7 @@ impl<'c> TransactionHandle<'c> { Self { catalog, transaction: Some(transaction), - permit, + permit: Some(permit), } } @@ -1026,26 +1015,15 @@ impl<'c> TransactionHandle<'c> { /// Write data to object store and commit transaction to underlying catalog. /// - /// This will first commit to object store and then to the in-memory state. - /// /// # Checkpointing - /// If `checkpoint_data` is passed this will also create a checkpoint at the end of the commit. Note that if the - /// checkpoint creation fails, the commit will still be treated as completed since the checkpoint is a mere - /// optimization to speed up transaction replay and allow to prune the history. + /// A [`CheckpointHandle`] will be returned that allows the caller to create a checkpoint. Note that this handle + /// holds a transaction lock, so it's safe to assume that no other transaction is in-progress while the caller + /// prepares the checkpoint. /// - /// Note that `checkpoint_data` must contain the state INCLUDING the to-be-commited transaction. - /// - /// # Failure Handling - /// When this function returns with an error, the caller MUST react accordingly: - /// - /// - [`CommitError::CommitFailed`]: It MUST be assumed that the commit has failed and all actions recorded - /// with this handle are NOT preserved. - /// - [`CommitError::CheckpointFailed`]: The actual transaction was written. Just the checkpoint was not - /// written. It MUST be assumed that all action recorded with this handle are preserved. - pub async fn commit( - mut self, - checkpoint_data: Option, - ) -> std::result::Result<(), CommitError> { + /// # Error Handling + /// When this function returns with an error, it MUST be assumed that the commit has failed and all actions + /// recorded with this handle are NOT preserved. + pub async fn commit(mut self) -> Result> { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); let tkey = t.tkey(); @@ -1067,23 +1045,17 @@ impl<'c> TransactionHandle<'c> { // maybe create a checkpoint // IMPORTANT: Create the checkpoint AFTER commiting the transaction to object store and to the in-memory state. // Checkpoints are an optional optimization and are not required to materialize a transaction. - if let Some(checkpoint_data) = checkpoint_data { - // NOTE: `inner_guard` will not be re-used here since it is a strong write lock and the checkpoint creation - // only needs a read lock. - self.create_checkpoint(tkey, previous_tkey, checkpoint_data) - .await - .map_err(Box::new) - .context(CheckpointFailed)?; - } - - Ok(()) + Ok(CheckpointHandle { + catalog: self.catalog, + tkey, + previous_tkey, + permit: self.permit.take().expect("transaction already dropped?!"), + }) } Err(e) => { warn!(?tkey, "failure while writing transaction, aborting"); t.abort(); - Err(CommitError::CommitFailed { - source: Box::new(e), - }) + Err(e) } } } @@ -1100,61 +1072,6 @@ impl<'c> TransactionHandle<'c> { t.commit(&mut previous_tkey_guard) } - async fn create_checkpoint( - &self, - tkey: TransactionKey, - previous_tkey: Option, - checkpoint_data: CheckpointData, - ) -> Result<()> { - let object_store = self.catalog.object_store(); - let server_id = self.catalog.server_id(); - let db_name = self.catalog.db_name(); - - // sort by key (= path) for deterministic output - let files = { - let mut tmp: Vec<_> = checkpoint_data.files.into_iter().collect(); - tmp.sort_by_key(|(path, _metadata)| path.clone()); - tmp - }; - - // create transaction to add parquet files - let actions: Result, Error> = files - .into_iter() - .map(|(path, metadata)| { - Ok(proto::transaction::Action { - action: Some(proto::transaction::action::Action::AddParquet( - proto::AddParquet { - path: Some(unparse_dirs_and_filename(&path)), - metadata: metadata.to_thrift().context(MetadataEncodingFailed)?, - }, - )), - }) - }) - .collect(); - let actions = actions?; - - // assemble and store checkpoint protobuf - let proto = proto::Transaction { - actions, - version: TRANSACTION_VERSION, - uuid: tkey.uuid.to_string(), - revision_counter: tkey.revision_counter, - previous_uuid: previous_tkey.map_or_else(String::new, |tkey| tkey.uuid.to_string()), - start_timestamp: Some(Utc::now().into()), - encoding: proto::transaction::Encoding::Full.into(), - }; - let path = file_path( - &object_store, - server_id, - db_name, - &tkey, - FileType::Checkpoint, - ); - store_transaction_proto(&object_store, &path, &proto).await?; - - Ok(()) - } - /// Abort transaction w/o commit. pub fn abort(mut self) { let t = std::mem::take(&mut self.transaction) @@ -1218,6 +1135,101 @@ impl<'c> Drop for TransactionHandle<'c> { } } +/// Handle that allows to create a checkpoint after a transaction. +/// +/// This handle holds a transaction lock. +pub struct CheckpointHandle<'c> { + catalog: &'c PreservedCatalog, + + // metadata about the just-committed transaction + tkey: TransactionKey, + previous_tkey: Option, + + // NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we + // rely on. + #[allow(dead_code)] + permit: SemaphorePermit<'c>, +} + +impl<'c> CheckpointHandle<'c> { + /// Create a checkpoint for the just-committed transaction. + /// + /// Note that `checkpoint_data` must contain the state INCLUDING the just-committed transaction. + /// + /// # Error Handling + /// If the checkpoint creation fails, the commit will still be treated as completed since the checkpoint is a mere + /// optimization to speed up transaction replay and allow to prune the history. + pub async fn create_checkpoint(self, checkpoint_data: CheckpointData) -> Result<()> { + let object_store = self.catalog.object_store(); + let server_id = self.catalog.server_id(); + let db_name = self.catalog.db_name(); + + // sort by key (= path) for deterministic output + let files = { + let mut tmp: Vec<_> = checkpoint_data.files.into_iter().collect(); + tmp.sort_by_key(|(path, _metadata)| path.clone()); + tmp + }; + + // create transaction to add parquet files + let actions: Result, Error> = files + .into_iter() + .map(|(path, metadata)| { + Ok(proto::transaction::Action { + action: Some(proto::transaction::action::Action::AddParquet( + proto::AddParquet { + path: Some(unparse_dirs_and_filename(&path)), + metadata: metadata.to_thrift().context(MetadataEncodingFailed)?, + }, + )), + }) + }) + .collect(); + let actions = actions?; + + // assemble and store checkpoint protobuf + let proto = proto::Transaction { + actions, + version: TRANSACTION_VERSION, + uuid: self.tkey.uuid.to_string(), + revision_counter: self.tkey.revision_counter, + previous_uuid: self + .previous_tkey + .map_or_else(String::new, |tkey| tkey.uuid.to_string()), + start_timestamp: Some(Utc::now().into()), + encoding: proto::transaction::Encoding::Full.into(), + }; + let path = file_path( + &object_store, + server_id, + db_name, + &self.tkey, + FileType::Checkpoint, + ); + store_transaction_proto(&object_store, &path, &proto).await?; + + Ok(()) + } + + /// Get revision counter for this transaction. + pub fn revision_counter(&self) -> u64 { + self.tkey.revision_counter + } + + /// Get UUID for this transaction + pub fn uuid(&self) -> Uuid { + self.tkey.uuid + } +} + +impl<'c> Debug for CheckpointHandle<'c> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CheckpointHandle") + .field("tkey", &self.tkey) + .finish() + } +} + pub mod test_helpers { use object_store::parsed_path; @@ -2504,8 +2516,11 @@ mod tests { // create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading) { let transaction = catalog.open_transaction().await; - let checkpoint_data = Some(state.checkpoint_data()); - transaction.commit(checkpoint_data).await.unwrap(); + let ckpt_handle = transaction.commit().await.unwrap(); + ckpt_handle + .create_checkpoint(state.checkpoint_data()) + .await + .unwrap(); } trace.record(&catalog, &state, false); @@ -2517,8 +2532,11 @@ mod tests { .parquet_files .insert(path.clone(), Arc::new(metadata.clone())); transaction.add_parquet(&path, &metadata).unwrap(); - let checkpoint_data = Some(state.checkpoint_data()); - transaction.commit(checkpoint_data).await.unwrap(); + let ckpt_handle = transaction.commit().await.unwrap(); + ckpt_handle + .create_checkpoint(state.checkpoint_data()) + .await + .unwrap(); } trace.record(&catalog, &state, false); @@ -2720,7 +2738,7 @@ mod tests { .insert(path.clone(), Arc::new(metadata1.clone())); t.add_parquet(&path, &metadata1).unwrap(); - t.commit(None).await.unwrap(); + t.commit().await.unwrap(); } assert_eq!(catalog.revision_counter(), 1); assert_catalog_parquet_files( @@ -2749,7 +2767,7 @@ mod tests { state.parquet_files.remove(&path); t.remove_parquet(&path).unwrap(); - t.commit(None).await.unwrap(); + t.commit().await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( @@ -3153,8 +3171,11 @@ mod tests { // create empty transaction w/ checkpoint { let transaction = catalog.open_transaction().await; - let checkpoint_data = Some(state.checkpoint_data()); - transaction.commit(checkpoint_data).await.unwrap(); + let ckpt_handle = transaction.commit().await.unwrap(); + ckpt_handle + .create_checkpoint(state.checkpoint_data()) + .await + .unwrap(); } trace.record(&catalog, &state, false); @@ -3258,8 +3279,11 @@ mod tests { // create empty transaction w/ checkpoint { let transaction = catalog.open_transaction().await; - let checkpoint_data = Some(state.checkpoint_data()); - transaction.commit(checkpoint_data).await.unwrap(); + let ckpt_handle = transaction.commit().await.unwrap(); + ckpt_handle + .create_checkpoint(state.checkpoint_data()) + .await + .unwrap(); } // delete transaction file diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 66e02aca52..9229ee73f7 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -222,7 +222,7 @@ mod tests { let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await; paths_delete.push(path.display()); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } // run clean-up @@ -264,7 +264,7 @@ mod tests { let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); path.display() }, diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index 50af2cbf25..a41a50c974 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -55,7 +55,10 @@ pub enum Error { FileRecordFailure { source: crate::catalog::Error }, #[snafu(display("Cannot commit transaction: {}", source))] - CommitFailure { source: crate::catalog::CommitError }, + CommitFailure { source: crate::catalog::Error }, + + #[snafu(display("Cannot create checkpoint: {}", source))] + CheckpointFailure { source: crate::catalog::Error }, } pub type Result = std::result::Result; @@ -149,19 +152,21 @@ where collected_files.insert(path, Arc::new(metadata)); } - let checkpoint_data = (revision_counter == max_revision).then(|| CheckpointData { - files: collected_files.clone(), - }); - transaction - .commit(checkpoint_data) - .await - .context(CommitFailure)?; + let ckpt_handle = transaction.commit().await.context(CommitFailure)?; + if revision_counter == max_revision { + ckpt_handle + .create_checkpoint(CheckpointData { + files: collected_files.clone(), + }) + .await + .context(CommitFailure)?; + } } else { // we do not have any files for this transaction (there might have been other actions though or it was // an empty transaction) => create new empty transaction // Note that this can never be the last transaction, so we don't need to create a checkpoint here. let transaction = catalog.open_transaction().await; - transaction.commit(None).await.context(CommitFailure)?; + transaction.commit().await.context(CheckpointFailure)?; } } } @@ -333,12 +338,12 @@ mod tests { .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } { // empty transaction let transaction = catalog.open_transaction().await; - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } { let mut transaction = catalog.open_transaction().await; @@ -357,7 +362,7 @@ mod tests { .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } // store catalog state @@ -517,7 +522,7 @@ mod tests { ) .await; - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } // wipe catalog @@ -628,7 +633,7 @@ mod tests { .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } { let mut transaction = catalog.open_transaction().await; @@ -647,7 +652,7 @@ mod tests { .insert(path.clone(), Arc::new(md.clone())); transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(None).await.unwrap(); + transaction.commit().await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); diff --git a/server/src/db.rs b/server/src/db.rs index 7a90b4745d..a9c6b66044 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -154,7 +154,7 @@ pub enum Error { #[snafu(display("Error while commiting transaction on preserved catalog: {}", source))] CommitError { - source: parquet_file::catalog::CommitError, + source: parquet_file::catalog::Error, }, #[snafu(display("Cannot write chunk: {}", addr))] @@ -708,38 +708,35 @@ impl Db { .add_parquet(&path, &parquet_metadata) .context(TransactionError)?; + // preserved commit + let ckpt_handle = transaction.commit().await.context(CommitError)?; + + // in-mem commit + { + let mut guard = chunk.write(); + if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { + panic!("Chunk written but cannot mark as written {}", e); + } + } + let create_checkpoint = catalog_transactions_until_checkpoint .map_or(false, |interval| { - transaction.revision_counter() % interval.get() == 0 + ckpt_handle.revision_counter() % interval.get() == 0 }); - let checkpoint_data = create_checkpoint.then(|| { - let mut checkpoint_data = checkpoint_data_from_catalog(&catalog); - // don't forget the file that we've just added - checkpoint_data.files.insert(path, parquet_metadata); - checkpoint_data - }); - - match transaction.commit(checkpoint_data).await { - Ok(()) => { - let mut guard = chunk.write(); - if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { - panic!("Chunk written but cannot mark as written {}", e); - } - } - Err(e @ parquet_file::catalog::CommitError::CheckpointFailed { .. }) => { + if create_checkpoint { + // Commit is already done, so we can just scan the catalog for the state. + // + // NOTE: There can only be a single transaction in this section because the checkpoint handle holds + // transaction lock. Therefore we don't need to worry about concurrent modifications of + // preserved chunks. + if let Err(e) = ckpt_handle + .create_checkpoint(checkpoint_data_from_catalog(&catalog)) + .await + { warn!(%e, "cannot create catalog checkpoint"); - // still mark chunk as persisted - let mut guard = chunk.write(); - if let Err(e) = guard.set_written_to_object_store(parquet_chunk) { - panic!("Chunk written but cannot mark as written {}", e); - } - } - Err(e @ parquet_file::catalog::CommitError::CommitFailed { .. }) => { - warn!(%e, "cannot create catalog transaction"); - - // do NOT mark chunk as persisted - return Err(Error::CommitError { source: e }); + // That's somewhat OK. Don't fail the entire task, because the actual preservation was completed + // (both in-mem and within the preserved catalog). } } } From 67508094b404931569509546867bc8df241158a0 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 22 Jun 2021 18:15:54 +0200 Subject: [PATCH 3/4] fix: double ref Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet_file/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index f89ecb2c83..1875a8877c 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1396,7 +1396,7 @@ pub mod test_helpers { state.remove(path.clone()).unwrap(); state .add( - Arc::clone(&&object_store), + Arc::clone(&object_store), CatalogParquetInfo { path: path.clone(), metadata: Arc::new(metadata), From e36b6f9c7a662f87e976e61fd409a4bfe69a09f9 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 22 Jun 2021 18:30:27 +0200 Subject: [PATCH 4/4] docs: fix intra-doc link --- parquet_file/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 1875a8877c..d3de834439 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -799,8 +799,8 @@ impl OpenTransaction { /// The deserializes the action state and passes it to the correct method in [`CatalogState`]. /// /// Note that this method is primarily for replaying transactions and will NOT append the given action to the - /// current transaction. If you also want to store the given action (e.g. during an in-progress transaction), use - /// [`handle_action_and_record`](Self::handle_action_and_record). + /// current transaction. If you want to store the given action (e.g. during an in-progress transaction), use + /// [`record_action`](Self::record_action). fn handle_action( state: &mut S, action: &proto::transaction::action::Action,