diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index c0e227a767..9577d03bdf 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -6,6 +6,7 @@ use std::{ }, convert::TryInto, fmt::{Debug, Display}, + marker::PhantomData, num::TryFromIntError, str::FromStr, sync::Arc, @@ -254,11 +255,6 @@ pub trait CatalogState { /// Remove parquet file from state. fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()>; - - /// List all Parquet files that are currently (i.e. by the current version) tracked by the catalog. - /// - /// If a file was once [added](Self::add) but later [removed](Self::remove) it MUST NOT appear in the result. - fn files(&self) -> HashMap>; } /// Find last transaction-start-timestamp. @@ -290,15 +286,6 @@ pub async fn find_last_transaction_timestamp( Ok(res) } -/// Inner mutable part of the preserved catalog. -struct PreservedCatalogInner -where - S: CatalogState, -{ - previous_tkey: Option, - state: Arc, -} - /// In-memory view of the preserved catalog. pub struct PreservedCatalog where @@ -322,12 +309,15 @@ where // 9. release semaphore // // Note that there can only be a single transaction that acquires the semaphore. - inner: RwLock>, + previous_tkey: RwLock>, transaction_semaphore: Semaphore, object_store: Arc, server_id: ServerId, db_name: String, + + // temporary measure to keep the API a bit more stable + phantom: PhantomData, } /// Deletes catalog. @@ -371,31 +361,28 @@ where server_id: ServerId, db_name: impl Into + Send, state_data: S::EmptyInput, - ) -> Result { + ) -> Result<(Self, Arc)> { let db_name = db_name.into(); if Self::exists(&object_store, server_id, &db_name).await? { return Err(Error::AlreadyExists {}); } - - let inner = PreservedCatalogInner { - previous_tkey: None, - state: Arc::new(S::new_empty(&db_name, state_data)), - }; + let state = Arc::new(S::new_empty(&db_name, state_data)); let catalog = Self { - inner: RwLock::new(inner), + previous_tkey: RwLock::new(None), transaction_semaphore: Semaphore::new(1), object_store, server_id, db_name, + phantom: PhantomData, }; // add empty transaction - let transaction = catalog.open_transaction().await; - transaction.commit(false).await?; + let transaction = catalog.open_transaction(state).await; + let state = transaction.commit(None).await?; - Ok(catalog) + Ok((catalog, state)) } /// Load existing catalog from store, if it exists. @@ -407,7 +394,7 @@ where server_id: ServerId, db_name: String, state_data: S::EmptyInput, - ) -> Result> { + ) -> Result)>> { // parse all paths into revisions let mut transactions: HashMap = HashMap::new(); let mut max_revision = None; @@ -462,7 +449,7 @@ where } // setup empty state - let mut state = Arc::new(CatalogState::new_empty(&db_name, state_data)); + let mut state = Arc::new(S::new_empty(&db_name, state_data)); let mut last_tkey = None; // detect replay start @@ -498,18 +485,17 @@ where last_tkey = Some(tkey); } - let inner = PreservedCatalogInner { - previous_tkey: last_tkey, + Ok(Some(( + Self { + previous_tkey: RwLock::new(last_tkey), + transaction_semaphore: Semaphore::new(1), + object_store, + server_id, + db_name, + phantom: PhantomData, + }, state, - }; - - Ok(Some(Self { - inner: RwLock::new(inner), - transaction_semaphore: Semaphore::new(1), - object_store, - server_id, - db_name, - })) + ))) } /// Open a new transaction. @@ -518,26 +504,24 @@ where /// transaction handle is dropped. The newly created transaction will contain the state after `await` (esp. /// post-blocking). This system is fair, which means that transactions are given out in the order they were /// requested. - pub async fn open_transaction(&self) -> TransactionHandle<'_, S> { - self.open_transaction_with_uuid(Uuid::new_v4()).await + pub async fn open_transaction(&self, state: Arc) -> TransactionHandle<'_, S> { + self.open_transaction_with_uuid(Uuid::new_v4(), state).await } /// Crate-private API to open an transaction with a specified UUID. Should only be used for catalog rebuilding or /// with a fresh V4-UUID! - pub(crate) async fn open_transaction_with_uuid(&self, uuid: Uuid) -> TransactionHandle<'_, S> { - TransactionHandle::new(self, uuid).await - } - - /// Return current state. - pub fn state(&self) -> Arc { - Arc::clone(&self.inner.read().state) + pub(crate) async fn open_transaction_with_uuid( + &self, + uuid: Uuid, + state: Arc, + ) -> TransactionHandle<'_, S> { + TransactionHandle::new(self, uuid, state).await } /// Get latest revision counter. pub fn revision_counter(&self) -> u64 { - self.inner + self.previous_tkey .read() - .previous_tkey .clone() .map(|tkey| tkey.revision_counter) .expect("catalog should have at least an empty transaction") @@ -835,14 +819,14 @@ where S: CatalogState + Send + Sync, { /// Private API to create new transaction, users should always use [`PreservedCatalog::open_transaction`]. - fn new(catalog_inner: &PreservedCatalogInner, uuid: Uuid) -> Self { - let (revision_counter, previous_uuid) = match &catalog_inner.previous_tkey { + fn new(previous_tkey: &Option, uuid: Uuid, state: Arc) -> Self { + let (revision_counter, previous_uuid) = match previous_tkey { Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()), None => (0, String::new()), }; Self { - tstate: S::transaction_begin(&catalog_inner.state), + tstate: S::transaction_begin(&state), proto: proto::Transaction { actions: vec![], version: TRANSACTION_VERSION, @@ -917,16 +901,19 @@ where } /// Commit to mutable catalog and return previous transaction key. - fn commit(self, catalog_inner: &mut PreservedCatalogInner) -> Option { + fn commit( + self, + previous_tkey: &mut Option, + ) -> (Option, Arc) { let mut tkey = Some(self.tkey()); - catalog_inner.state = S::transaction_end(self.tstate, TransactionEnd::Commit); - std::mem::swap(&mut catalog_inner.previous_tkey, &mut tkey); - tkey + let state = S::transaction_end(self.tstate, TransactionEnd::Commit); + std::mem::swap(previous_tkey, &mut tkey); + (tkey, state) } /// Abort transaction - fn abort(self, catalog_inner: &mut PreservedCatalogInner) { - catalog_inner.state = S::transaction_end(self.tstate, TransactionEnd::Abort); + fn abort(self) -> Arc { + S::transaction_end(self.tstate, TransactionEnd::Abort) } async fn store( @@ -1026,6 +1013,18 @@ where } } +/// Structure that holds all information required to create a checkpoint. +/// +/// Note that while checkpoint are addressed using the same schema as we use for transaction (revision counter, UUID), +/// they contain the changes at the end (aka including) the transaction they refer. +#[derive(Debug)] +pub struct CheckpointData { + /// List of all Parquet files that are currently (i.e. by the current version) tracked by the catalog. + /// + /// If a file was once added but later removed it MUST NOT appear in the result. + pub files: HashMap>, +} + /// Handle for an open uncommitted transaction. /// /// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning. @@ -1047,19 +1046,23 @@ impl<'c, S> TransactionHandle<'c, S> where S: CatalogState + Send + Sync, { - async fn new(catalog: &'c PreservedCatalog, uuid: Uuid) -> TransactionHandle<'c, S> { + async fn new( + catalog: &'c PreservedCatalog, + uuid: Uuid, + state: Arc, + ) -> TransactionHandle<'c, S> { // first acquire semaphore (which is only being used for transactions), then get state lock let permit = catalog .transaction_semaphore .acquire() .await .expect("semaphore should not be closed"); - let inner_guard = catalog.inner.write(); + let previous_tkey_guard = catalog.previous_tkey.write(); - let transaction = OpenTransaction::new(&inner_guard, uuid); + let transaction = OpenTransaction::new(&previous_tkey_guard, uuid, state); // free state for readers again - drop(inner_guard); + drop(previous_tkey_guard); let tkey = transaction.tkey(); info!(?tkey, "transaction started"); @@ -1071,6 +1074,15 @@ where } } + /// Get current transaction state. + pub fn tstate(&self) -> &S::TransactionState { + &self + .transaction + .as_ref() + .expect("No transaction in progress?") + .tstate + } + /// Get revision counter for this transaction. pub fn revision_counter(&self) -> u64 { self.transaction @@ -1093,10 +1105,13 @@ where /// /// This will first commit to object store and then to the in-memory state. /// - /// If `create_checkpoint` is set this will also create a checkpoint at the end of the commit. Note that if the + /// # Checkpointing + /// If `checkpoint_data` is passed this will also create a checkpoint at the end of the commit. Note that if the /// checkpoint creation fails, the commit will still be treated as completed since the checkpoint is a mere /// optimization to speed up transaction replay and allow to prune the history. - pub async fn commit(mut self, create_checkpoint: bool) -> Result<()> { + /// + /// Note that `checkpoint_data` must contain the state INCLUDING the to-be-commited transaction. + pub async fn commit(mut self, checkpoint_data: Option) -> Result> { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); let tkey = t.tkey(); @@ -1118,17 +1133,18 @@ where // maybe create a checkpoint // IMPORTANT: Create the checkpoint AFTER commiting the transaction to object store and to the in-memory state. // Checkpoints are an optional optimization and are not required to materialize a transaction. - if create_checkpoint { + if let Some(checkpoint_data) = checkpoint_data { // NOTE: `inner_guard` will not be re-used here since it is a strong write lock and the checkpoint creation // only needs a read lock. - self.create_checkpoint(tkey, previous_tkey, state).await?; + self.create_checkpoint(tkey, previous_tkey, checkpoint_data) + .await?; } - Ok(()) + Ok(state) } Err(e) => { warn!(?tkey, "failure while writing transaction, aborting"); - self.abort_inner(t); + t.abort(); Err(e) } } @@ -1142,32 +1158,23 @@ where /// - rustc seems to fold the guard into the async generator state even when we `drop` it quickly, making the /// resulting future `!Send`. However tokio requires our futures to be `Send`. fn commit_inner(&self, t: OpenTransaction) -> (Option, Arc) { - let mut inner_guard = self.catalog.inner.write(); - let previous_tkey = t.commit(&mut inner_guard); - let state = Arc::clone(&inner_guard.state); - - (previous_tkey, state) - } - - fn abort_inner(&self, t: OpenTransaction) { - let mut inner_guard = self.catalog.inner.write(); - t.abort(&mut inner_guard); + let mut previous_tkey_guard = self.catalog.previous_tkey.write(); + t.commit(&mut previous_tkey_guard) } async fn create_checkpoint( &self, tkey: TransactionKey, previous_tkey: Option, - state: Arc, + checkpoint_data: CheckpointData, ) -> Result<()> { - let files = state.files(); let object_store = self.catalog.object_store(); let server_id = self.catalog.server_id(); let db_name = self.catalog.db_name(); // sort by key (= path) for deterministic output let files = { - let mut tmp: Vec<_> = files.into_iter().collect(); + let mut tmp: Vec<_> = checkpoint_data.files.into_iter().collect(); tmp.sort_by_key(|(path, _metadata)| path.clone()); tmp }; @@ -1211,10 +1218,10 @@ where } /// Abort transaction w/o commit. - pub fn abort(mut self) { + pub fn abort(mut self) -> Arc { let t = std::mem::take(&mut self.transaction) .expect("calling .commit on a closed transaction?!"); - self.abort_inner(t); + t.abort() } /// Add a new parquet file to the catalog. @@ -1272,7 +1279,7 @@ where fn drop(&mut self) { if let Some(t) = self.transaction.take() { warn!(?self, "dropped uncommitted transaction, calling abort"); - self.abort_inner(t); + t.abort(); } } } @@ -1292,10 +1299,19 @@ pub mod test_helpers { pub parquet_files: HashMap>, } + impl TestCatalogState { + /// Simple way to create [`CheckpointData`]. + pub fn checkpoint_data(&self) -> CheckpointData { + CheckpointData { + files: self.parquet_files.clone(), + } + } + } + #[derive(Debug)] pub struct TState { - old: Arc, - new: TestCatalogState, + pub old: Arc, + pub new: TestCatalogState, } impl CatalogState for TestCatalogState { @@ -1354,10 +1370,6 @@ pub mod test_helpers { Ok(()) } - - fn files(&self) -> HashMap> { - self.parquet_files.clone() - } } /// Break preserved catalog by moving one of the transaction files into a weird unknown version. @@ -1387,22 +1399,24 @@ pub mod test_helpers { where S: CatalogState + Send + Sync, { - let guard = catalog.inner.read(); + let guard = catalog.previous_tkey.read(); guard - .previous_tkey .as_ref() .expect("should have at least a single transaction") .clone() } /// Torture-test implementations for [`CatalogState`]. - pub async fn assert_catalog_state_implementation(state_data: S::EmptyInput) + /// + /// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided. + pub async fn assert_catalog_state_implementation(state_data: S::EmptyInput, f: F) where S: CatalogState + Send + Sync, + F: Fn(&S) -> CheckpointData + Send, { // empty state let object_store = make_object_store(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, mut state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), ServerId::try_from(1).unwrap(), "db1".to_string(), @@ -1411,12 +1425,12 @@ pub mod test_helpers { .await .unwrap(); let mut expected = HashMap::new(); - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // add files let mut chunk_id_watermark = 5; { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; for chunk_id in 0..chunk_id_watermark { let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref()); @@ -1425,25 +1439,25 @@ pub mod test_helpers { expected.insert(path, Arc::new(metadata)); } - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // remove files { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let path = parsed_path!("chunk_1"); transaction.remove_parquet(&path).unwrap(); expected.remove(&path); - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // add and remove in the same transaction { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (_, metadata) = @@ -1452,26 +1466,26 @@ pub mod test_helpers { transaction.remove_parquet(&path).unwrap(); chunk_id_watermark += 1; - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // remove and add in the same transaction { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let path = parsed_path!("chunk_2"); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; transaction.remove_parquet(&path).unwrap(); transaction.add_parquet(&path, &metadata).unwrap(); - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // add, remove, add in the same transaction { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let (_, metadata) = @@ -1482,13 +1496,13 @@ pub mod test_helpers { expected.insert(path, Arc::new(metadata)); chunk_id_watermark += 1; - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // remove, add, remove in same transaction { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let path = parsed_path!("chunk_2"); let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; @@ -1497,13 +1511,13 @@ pub mod test_helpers { transaction.remove_parquet(&path).unwrap(); expected.remove(&path); - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // error handling, no real opt { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; // already exists (should also not change the metadata) let path = parsed_path!("chunk_0"); @@ -1517,13 +1531,13 @@ pub mod test_helpers { assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); chunk_id_watermark += 1; - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // error handling, still something works { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; // already exists (should also not change the metadata) let path = parsed_path!("chunk_0"); @@ -1558,13 +1572,13 @@ pub mod test_helpers { let err = transaction.remove_parquet(&path).unwrap_err(); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); - transaction.commit(true).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // transaction aborting { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; // add let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); @@ -1585,11 +1599,11 @@ pub mod test_helpers { transaction.remove_parquet(&path).unwrap(); chunk_id_watermark += 1; } - assert_files_eq(&catalog.state().files(), &expected); + assert_checkpoint(state.as_ref(), &f, &expected); // transaction aborting w/ errors { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; // already exists (should also not change the metadata) let path = parsed_path!("chunk_0"); @@ -1609,17 +1623,22 @@ pub mod test_helpers { } /// Assert that tracked files and their linked metadata are equal. - fn assert_files_eq( - actual: &HashMap>, - expected: &HashMap>, - ) { - let sorted_keys_actual = get_sorted_keys(actual); - let sorted_keys_expected = get_sorted_keys(expected); + fn assert_checkpoint( + state: &S, + f: &F, + expected_files: &HashMap>, + ) where + F: Fn(&S) -> CheckpointData, + { + let actual_files = f(state).files; + + let sorted_keys_actual = get_sorted_keys(&actual_files); + let sorted_keys_expected = get_sorted_keys(expected_files); assert_eq!(sorted_keys_actual, sorted_keys_expected); for k in sorted_keys_actual { - let md_actual = &actual[&k]; - let md_expected = &expected[&k]; + let md_actual = &actual_files[&k]; + let md_expected = &expected_files[&k]; let iox_md_actual = md_actual.read_iox_metadata().unwrap(); let iox_md_expected = md_expected.read_iox_metadata().unwrap(); @@ -1650,7 +1669,7 @@ pub mod test_helpers { #[cfg(test)] mod tests { - use std::{num::NonZeroU32, ops::Deref}; + use std::num::NonZeroU32; use crate::test_utils::{chunk_addr, make_metadata, make_object_store}; use object_store::parsed_path; @@ -1827,7 +1846,7 @@ mod tests { assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; // break transaction file - let catalog = PreservedCatalog::::load( + let (catalog, _state) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2155,7 +2174,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_debug() { let object_store = make_object_store(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( object_store, make_server_id(), "db1".to_string(), @@ -2163,7 +2182,7 @@ mod tests { ) .await .unwrap(); - let mut t = catalog.open_transaction().await; + let mut t = catalog.open_transaction(state).await; // open transaction t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); @@ -2564,7 +2583,7 @@ mod tests { let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; // re-open catalog - let catalog = PreservedCatalog::load( + let (catalog, mut state) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2576,20 +2595,22 @@ mod tests { // create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading) { - let transaction = catalog.open_transaction().await; - transaction.commit(true).await.unwrap(); + let transaction = catalog.open_transaction(state).await; + let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); + state = transaction.commit(checkpoint_data).await.unwrap(); } - trace.record(&catalog, false); + trace.record(&catalog, &state, false); // create another transaction on-top that adds a file (this transaction will be required to load the full state) { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; transaction .add_parquet(&parsed_path!("last_one"), &metadata) .unwrap(); - transaction.commit(true).await.unwrap(); + let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); + state = transaction.commit(checkpoint_data).await.unwrap(); } - trace.record(&catalog, false); + trace.record(&catalog, &state, false); // close catalog again drop(catalog); @@ -2611,7 +2632,7 @@ mod tests { } // load catalog from store and check replayed state - let catalog = PreservedCatalog::load( + let (catalog, state) = PreservedCatalog::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2625,7 +2646,7 @@ mod tests { trace.tkeys.last().unwrap().revision_counter ); assert_catalog_parquet_files( - &catalog, + &state, &get_catalog_parquet_files(trace.states.last().unwrap()), ); } @@ -2643,10 +2664,10 @@ mod tests { /// Assert that set of parquet files tracked by a catalog are identical to the given sorted list. fn assert_catalog_parquet_files( - catalog: &PreservedCatalog, + state: &TestCatalogState, expected: &[(String, IoxParquetMetaData)], ) { - let actual = get_catalog_parquet_files(&catalog.state()); + let actual = get_catalog_parquet_files(state); for ((actual_path, actual_md), (expected_path, expected_md)) in actual.iter().zip(expected.iter()) { @@ -2725,10 +2746,15 @@ mod tests { } } - fn record(&mut self, catalog: &PreservedCatalog, aborted: bool) { + fn record( + &mut self, + catalog: &PreservedCatalog, + state: &TestCatalogState, + aborted: bool, + ) { self.tkeys - .push(catalog.inner.read().previous_tkey.clone().unwrap()); - self.states.push(catalog.state().deref().clone()); + .push(catalog.previous_tkey.read().clone().unwrap()); + self.states.push(state.clone()); self.post_timestamps.push(Utc::now()); self.aborted.push(aborted); } @@ -2739,7 +2765,7 @@ mod tests { server_id: ServerId, db_name: &str, ) -> TestTrace { - let catalog = PreservedCatalog::new_empty( + let (catalog, mut state) = PreservedCatalog::new_empty( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2757,12 +2783,12 @@ mod tests { // empty catalog has no data assert_eq!(catalog.revision_counter(), 0); - assert_catalog_parquet_files(&catalog, &[]); - trace.record(&catalog, false); + assert_catalog_parquet_files(&state, &[]); + trace.record(&catalog, &state, false); // fill catalog with examples { - let mut t = catalog.open_transaction().await; + let mut t = catalog.open_transaction(state).await; t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap(); t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2) @@ -2772,11 +2798,11 @@ mod tests { t.add_parquet(&parsed_path!(["sub2"], "test1"), &metadata1) .unwrap(); - t.commit(false).await.unwrap(); + state = t.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 1); assert_catalog_parquet_files( - &catalog, + &state, &[ ("sub1/test1".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()), @@ -2784,11 +2810,11 @@ mod tests { ("test1".to_string(), metadata1.clone()), ], ); - trace.record(&catalog, false); + trace.record(&catalog, &state, false); // modify catalog with examples { - let mut t = catalog.open_transaction().await; + let mut t = catalog.open_transaction(state).await; // "real" modifications t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap(); @@ -2802,11 +2828,11 @@ mod tests { t.remove_parquet(&parsed_path!("test1")) .expect_err("removing twice should error"); - t.commit(false).await.unwrap(); + state = t.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( - &catalog, + &state, &[ ("sub1/test1".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()), @@ -2814,11 +2840,11 @@ mod tests { ("test4".to_string(), metadata1.clone()), ], ); - trace.record(&catalog, false); + trace.record(&catalog, &state, false); // uncommitted modifications have no effect { - let mut t = catalog.open_transaction().await; + let mut t = catalog.open_transaction(Arc::clone(&state)).await; t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap(); t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap(); @@ -2827,7 +2853,7 @@ mod tests { } assert_eq!(catalog.revision_counter(), 2); assert_catalog_parquet_files( - &catalog, + &state, &[ ("sub1/test1".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()), @@ -2835,7 +2861,7 @@ mod tests { ("test4".to_string(), metadata1.clone()), ], ); - trace.record(&catalog, true); + trace.record(&catalog, &state, true); trace } @@ -2923,7 +2949,7 @@ mod tests { assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await; // break - let catalog = PreservedCatalog::::load( + let (catalog, _state) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -2999,7 +3025,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_revision_counter() { let object_store = make_object_store(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( object_store, make_server_id(), "db1".to_string(), @@ -3007,7 +3033,7 @@ mod tests { ) .await .unwrap(); - let t = catalog.open_transaction().await; + let t = catalog.open_transaction(state).await; assert_eq!(t.revision_counter(), 1); } @@ -3015,7 +3041,7 @@ mod tests { #[tokio::test] async fn test_transaction_handle_uuid() { let object_store = make_object_store(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( object_store, make_server_id(), "db1".to_string(), @@ -3023,7 +3049,7 @@ mod tests { ) .await .unwrap(); - let mut t = catalog.open_transaction().await; + let mut t = catalog.open_transaction(state).await; t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); assert_eq!(t.uuid(), Uuid::nil()); @@ -3180,7 +3206,7 @@ mod tests { let db_name = "db1"; let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; - let catalog = PreservedCatalog::load( + let (catalog, mut state) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -3192,10 +3218,11 @@ mod tests { // create empty transaction w/ checkpoint { - let transaction = catalog.open_transaction().await; - transaction.commit(true).await.unwrap(); + let transaction = catalog.open_transaction(state).await; + let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); + state = transaction.commit(checkpoint_data).await.unwrap(); } - trace.record(&catalog, false); + trace.record(&catalog, &state, false); // delete transaction files for (aborted, tkey) in trace.aborted.iter().zip(trace.tkeys.iter()) { @@ -3246,7 +3273,7 @@ mod tests { let trace = assert_single_catalog_inmem_works(object_store, server_id, db_name).await; // load catalog from store and check replayed state - let catalog = + let (catalog, state) = PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string(), ()) .await .unwrap() @@ -3256,7 +3283,7 @@ mod tests { trace.tkeys.last().unwrap().revision_counter ); assert_catalog_parquet_files( - &catalog, + &state, &get_catalog_parquet_files(trace.states.last().unwrap()), ); } @@ -3273,7 +3300,7 @@ mod tests { .unwrap() ); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -3283,7 +3310,7 @@ mod tests { .unwrap(); // delete transaction file - let tkey = catalog.inner.read().previous_tkey.clone().unwrap(); + let tkey = catalog.previous_tkey.read().clone().unwrap(); let path = file_path( &object_store, server_id, @@ -3295,12 +3322,13 @@ mod tests { // create empty transaction w/ checkpoint { - let transaction = catalog.open_transaction().await; - transaction.commit(true).await.unwrap(); + let transaction = catalog.open_transaction(state).await; + let checkpoint_data = Some(transaction.tstate().new.checkpoint_data()); + transaction.commit(checkpoint_data).await.unwrap(); } // delete transaction file - let tkey = catalog.inner.read().previous_tkey.clone().unwrap(); + let tkey = catalog.previous_tkey.read().clone().unwrap(); let path = file_path( &object_store, server_id, @@ -3330,6 +3358,10 @@ mod tests { #[tokio::test] async fn test_catalog_state() { - assert_catalog_state_implementation::(()).await; + assert_catalog_state_implementation::( + (), + TestCatalogState::checkpoint_data, + ) + .await; } } diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index cce4c4afd5..d41c480679 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -1,13 +1,12 @@ //! Methods to cleanup the object store. use std::{ - collections::{HashMap, HashSet}, + collections::HashSet, sync::{Arc, Mutex}, }; use crate::{ catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog, TransactionEnd}, - metadata::IoxParquetMetaData, storage::data_location, }; use futures::TryStreamExt; @@ -42,6 +41,7 @@ pub type Result = std::result::Result; /// use `max_files` which will limit the number of files to delete in this cleanup round. pub async fn cleanup_unreferenced_parquet_files( catalog: &PreservedCatalog, + state: Arc, max_files: usize, ) -> Result<()> where @@ -49,14 +49,14 @@ where { // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there // that are about to get added to the catalog. - let transaction = catalog.open_transaction().await; + let transaction = catalog.open_transaction(state).await; let store = catalog.object_store(); let server_id = catalog.server_id(); let db_name = catalog.db_name(); let all_known = { // replay catalog transactions to track ALL (even dropped) files that are referenced - let catalog = PreservedCatalog::::load( + let (_catalog, state) = PreservedCatalog::::load( Arc::clone(&store), server_id, db_name.to_string(), @@ -65,12 +65,9 @@ where .await .context(CatalogLoadError)? .expect("catalog gone while reading it?"); - catalog - .state() - .files - .lock() - .expect("lock poissened?") - .clone() + + let file_guard = state.files.lock().expect("lock poissened?"); + file_guard.clone() }; let prefix = data_location(&store, server_id, db_name); @@ -165,10 +162,6 @@ impl CatalogState for TracerCatalogState { // Do NOT remove the file since we still need it for time travel Ok(()) } - - fn files(&self) -> HashMap> { - unimplemented!("File tracking not implemented for TracerCatalogState") - } } #[cfg(test)] @@ -191,7 +184,7 @@ mod tests { let server_id = make_server_id(); let db_name = "db1"; - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -201,7 +194,7 @@ mod tests { .unwrap(); // run clean-up - cleanup_unreferenced_parquet_files(&catalog, 1_000) + cleanup_unreferenced_parquet_files(&catalog, state, 1_000) .await .unwrap(); } @@ -212,7 +205,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, mut state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -225,7 +218,7 @@ mod tests { let mut paths_keep = vec![]; let mut paths_delete = vec![]; { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; // an ordinary tracked parquet file => keep let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await; @@ -249,11 +242,11 @@ mod tests { let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await; paths_delete.push(path.display()); - transaction.commit(false).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } // run clean-up - cleanup_unreferenced_parquet_files(&catalog, 1_000) + cleanup_unreferenced_parquet_files(&catalog, state, 1_000) .await .unwrap(); @@ -273,7 +266,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -286,17 +279,17 @@ mod tests { for i in 0..100 { let (path, _) = tokio::join!( async { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(Arc::clone(&state)).await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await; transaction.add_parquet(&path.clone().into(), &md).unwrap(); - transaction.commit(false).await.unwrap(); + transaction.commit(None).await.unwrap(); path.display() }, async { - cleanup_unreferenced_parquet_files(&catalog, 1_000) + cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 1_000) .await .unwrap(); }, @@ -313,7 +306,7 @@ mod tests { let server_id = make_server_id(); let db_name = db_name(); - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -330,7 +323,7 @@ mod tests { } // run clean-up - cleanup_unreferenced_parquet_files(&catalog, 2) + cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) .await .unwrap(); @@ -340,7 +333,7 @@ mod tests { assert_eq!(leftover.len(), 1); // run clean-up again - cleanup_unreferenced_parquet_files(&catalog, 2) + cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2) .await .unwrap(); diff --git a/parquet_file/src/rebuild.rs b/parquet_file/src/rebuild.rs index 66d123e129..5cb983532f 100644 --- a/parquet_file/src/rebuild.rs +++ b/parquet_file/src/rebuild.rs @@ -15,7 +15,7 @@ use snafu::{ResultExt, Snafu}; use uuid::Uuid; use crate::{ - catalog::{CatalogState, PreservedCatalog}, + catalog::{CatalogState, CheckpointData, PreservedCatalog}, metadata::{IoxMetadata, IoxParquetMetaData}, }; #[derive(Debug, Snafu)] @@ -96,57 +96,63 @@ pub async fn rebuild_catalog( db_name: N, catalog_empty_input: S::EmptyInput, ignore_metadata_read_failure: bool, -) -> Result> +) -> Result<(PreservedCatalog, Arc)> where S: CatalogState + Send + Sync, N: Into + Send, { // collect all revisions from parquet files - let revisions = + let mut revisions = collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?; // create new empty catalog - let catalog = + let (catalog, mut state) = PreservedCatalog::::new_empty(object_store, server_id, db_name, catalog_empty_input) .await .context(NewEmptyFailure)?; + // trace all files for final checkpoint + let mut collected_files = HashMap::new(); + // simulate all transactions - if let Some(max_revision) = revisions.keys().max() { - for revision_counter in 1..=*max_revision { + if let Some(max_revision) = revisions.keys().max().cloned() { + for revision_counter in 1..=max_revision { assert_eq!( catalog.revision_counter() + 1, revision_counter, "revision counter during transaction simulation out-of-sync" ); - let create_checkpoint = revision_counter == *max_revision; - if let Some((uuid, entries)) = revisions.get(&revision_counter) { + if let Some((uuid, entries)) = revisions.remove(&revision_counter) { // we have files for this particular transaction - let mut transaction = catalog.open_transaction_with_uuid(*uuid).await; + let mut transaction = catalog.open_transaction_with_uuid(uuid, state).await; for (path, metadata) in entries { let path: DirsAndFileName = path.clone().into(); + transaction - .add_parquet(&path, metadata) + .add_parquet(&path, &metadata) .context(FileRecordFailure)?; + collected_files.insert(path, Arc::new(metadata)); } - transaction - .commit(create_checkpoint) + + let checkpoint_data = (revision_counter == max_revision).then(|| CheckpointData { + files: collected_files.clone(), + }); + state = transaction + .commit(checkpoint_data) .await .context(CommitFailure)?; } else { // we do not have any files for this transaction (there might have been other actions though or it was // an empty transaction) => create new empty transaction - let transaction = catalog.open_transaction().await; - transaction - .commit(create_checkpoint) - .await - .context(CommitFailure)?; + // Note that this can never be the last transaction, so we don't need to create a checkpoint here. + let transaction = catalog.open_transaction(state).await; + state = transaction.commit(None).await.context(CommitFailure)?; } } } - Ok(catalog) + Ok((catalog, state)) } /// Collect all files under the given locations. @@ -273,7 +279,7 @@ mod tests { let db_name = "db1"; // build catalog with some data - let catalog = PreservedCatalog::::new_empty( + let (catalog, mut state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -282,7 +288,7 @@ mod tests { .await .unwrap(); { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let (path, md) = create_parquet_file( &object_store, @@ -306,15 +312,15 @@ mod tests { .await; transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(false).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } { // empty transaction - let transaction = catalog.open_transaction().await; - transaction.commit(false).await.unwrap(); + let transaction = catalog.open_transaction(state).await; + state = transaction.commit(None).await.unwrap(); } { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let (path, md) = create_parquet_file( &object_store, @@ -327,12 +333,11 @@ mod tests { .await; transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(false).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } // store catalog state let paths_expected = { - let state = catalog.state(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); tmp.sort(); tmp @@ -344,7 +349,7 @@ mod tests { // rebuild let path = object_store.new_path(); - let catalog = rebuild_catalog::( + let (catalog, state) = rebuild_catalog::( object_store, &path, server_id, @@ -357,7 +362,6 @@ mod tests { // check match let paths_actual = { - let state = catalog.state(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); tmp.sort(); tmp @@ -373,7 +377,7 @@ mod tests { let db_name = "db1"; // build empty catalog - let catalog = PreservedCatalog::::new_empty( + let (catalog, _state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -388,7 +392,7 @@ mod tests { // rebuild let path = object_store.new_path(); - let catalog = rebuild_catalog::( + let (catalog, state) = rebuild_catalog::( object_store, &path, server_id, @@ -400,7 +404,6 @@ mod tests { .unwrap(); // check match - let state = catalog.state(); assert!(state.parquet_files.is_empty()); assert_eq!(catalog.revision_counter(), 0); } @@ -451,7 +454,7 @@ mod tests { let db_name = "db1"; // build catalog with same data - let catalog = PreservedCatalog::::new_empty( + let (catalog, state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -460,7 +463,7 @@ mod tests { .await .unwrap(); { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let (path, md) = create_parquet_file( &object_store, @@ -484,7 +487,7 @@ mod tests { ) .await; - transaction.commit(false).await.unwrap(); + transaction.commit(None).await.unwrap(); } // wipe catalog @@ -544,7 +547,7 @@ mod tests { .starts_with("Cannot read IOx metadata from parquet file")); // rebuild (ignore errors) - let catalog = rebuild_catalog::( + let (catalog, state) = rebuild_catalog::( object_store, &path, server_id, @@ -554,7 +557,6 @@ mod tests { ) .await .unwrap(); - let state = catalog.state(); assert!(state.parquet_files.is_empty()); assert_eq!(catalog.revision_counter(), 0); } @@ -566,7 +568,7 @@ mod tests { let db_name = "db1"; // build catalog with some data (2 transactions + initial empty one) - let catalog = PreservedCatalog::::new_empty( + let (catalog, mut state) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, db_name, @@ -575,7 +577,7 @@ mod tests { .await .unwrap(); { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let (path, md) = create_parquet_file( &object_store, @@ -588,10 +590,10 @@ mod tests { .await; transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(false).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } { - let mut transaction = catalog.open_transaction().await; + let mut transaction = catalog.open_transaction(state).await; let (path, md) = create_parquet_file( &object_store, @@ -604,13 +606,12 @@ mod tests { .await; transaction.add_parquet(&path, &md).unwrap(); - transaction.commit(false).await.unwrap(); + state = transaction.commit(None).await.unwrap(); } assert_eq!(catalog.revision_counter(), 2); // store catalog state let paths_expected = { - let state = catalog.state(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); tmp.sort(); tmp @@ -656,7 +657,7 @@ mod tests { assert!(deleted); // load catalog - let catalog = PreservedCatalog::::load( + let (catalog, state) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, db_name.to_string(), @@ -668,7 +669,6 @@ mod tests { // check match let paths_actual = { - let state = catalog.state(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); tmp.sort(); tmp diff --git a/server/src/config.rs b/server/src/config.rs index cac388406b..b2712726be 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -238,6 +238,7 @@ impl Config { object_store: Arc, exec: Arc, preserved_catalog: PreservedCatalog, + catalog: Arc, ) { let mut state = self.state.write().expect("mutex poisoned"); let name = rules.name.clone(); @@ -263,6 +264,7 @@ impl Config { exec, Arc::clone(&self.jobs), preserved_catalog, + catalog, write_buffer, )); @@ -451,6 +453,7 @@ impl<'a> CreateDatabaseHandle<'a> { object_store: Arc, exec: Arc, preserved_catalog: PreservedCatalog, + catalog: Arc, rules: DatabaseRules, ) -> Result<()> { let db_name = self.db_name.take().expect("not committed"); @@ -462,8 +465,14 @@ impl<'a> CreateDatabaseHandle<'a> { }); } - self.config - .commit_db(rules, server_id, object_store, exec, preserved_catalog); + self.config.commit_db( + rules, + server_id, + object_store, + exec, + preserved_catalog, + catalog, + ); Ok(()) } @@ -531,6 +540,7 @@ impl<'a> RecoverDatabaseHandle<'a> { object_store: Arc, exec: Arc, preserved_catalog: PreservedCatalog, + catalog: Arc, rules: Option, ) -> Result<()> { let db_name = self.db_name.take().expect("not committed"); @@ -547,8 +557,14 @@ impl<'a> RecoverDatabaseHandle<'a> { }); } - self.config - .commit_db(rules, server_id, object_store, exec, preserved_catalog); + self.config.commit_db( + rules, + server_id, + object_store, + exec, + preserved_catalog, + catalog, + ); Ok(()) } @@ -623,7 +639,7 @@ mod test { let server_id = ServerId::try_from(1).unwrap(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let exec = Arc::new(Executor::new(1)); - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), server_id, @@ -642,13 +658,14 @@ mod test { Arc::clone(&store), Arc::clone(&exec), preserved_catalog, + catalog, rules.clone(), ) .unwrap_err(); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); } - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), server_id, @@ -659,7 +676,7 @@ mod test { .unwrap(); let db_reservation = config.create_db(name.clone()).unwrap(); db_reservation - .commit_db(server_id, store, exec, preserved_catalog, rules) + .commit_db(server_id, store, exec, preserved_catalog, catalog, rules) .unwrap(); assert!(config.db(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); @@ -733,7 +750,7 @@ mod test { let server_id = ServerId::try_from(1).unwrap(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let exec = Arc::new(Executor::new(1)); - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), server_id, @@ -750,13 +767,14 @@ mod test { Arc::clone(&store), Arc::clone(&exec), preserved_catalog, + catalog, Some(DatabaseRules::new(DatabaseName::new("bar").unwrap())), ) .unwrap_err(); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); } - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), server_id, @@ -768,7 +786,7 @@ mod test { let db_reservation = config.recover_db(name.clone()).unwrap(); assert!(db_reservation.has_rules()); db_reservation - .commit_db(server_id, store, exec, preserved_catalog, None) + .commit_db(server_id, store, exec, preserved_catalog, catalog, None) .unwrap(); assert!(config.db(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); @@ -829,7 +847,7 @@ mod test { let server_id = ServerId::try_from(1).unwrap(); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let exec = Arc::new(Executor::new(1)); - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), server_id, @@ -839,7 +857,7 @@ mod test { .await .unwrap(); db_reservation - .commit_db(server_id, store, exec, preserved_catalog, rules) + .commit_db(server_id, store, exec, preserved_catalog, catalog, rules) .unwrap(); let token = config diff --git a/server/src/db.rs b/server/src/db.rs index 533ca7df69..5ee7b4fe14 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -30,8 +30,7 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use observability_deps::tracing::{debug, error, info, warn}; use parking_lot::RwLock; -use parquet_file::catalog::TransactionEnd; -use parquet_file::metadata::IoxParquetMetaData; +use parquet_file::catalog::{CheckpointData, TransactionEnd}; use parquet_file::{ catalog::{ wipe as wipe_preserved_catalog, CatalogParquetInfo, CatalogState, ChunkCreationFailed, @@ -221,6 +220,9 @@ pub struct Db { /// Executor for running queries exec: Arc, + /// Preserved catalog (data in object store). + preserved_catalog: Arc>, + /// The catalog holds chunks of data under partitions for the database. /// The underlying chunks may be backed by different execution engines /// depending on their stage in the data lifecycle. Currently there are @@ -230,7 +232,7 @@ pub struct Db { /// - The Read Buffer where chunks are immutable and stored in an optimised /// compressed form for small footprint and fast query execution; and /// - The Parquet Buffer where chunks are backed by Parquet file data. - preserved_catalog: Arc>, + catalog: Arc, /// A handle to the global jobs registry for long running tasks jobs: Arc, @@ -270,7 +272,7 @@ pub async fn load_or_create_preserved_catalog( server_id: ServerId, metrics_registry: Arc, wipe_on_error: bool, -) -> std::result::Result, parquet_file::catalog::Error> { +) -> std::result::Result<(PreservedCatalog, Arc), parquet_file::catalog::Error> { let metric_labels = vec![ KeyValue::new("db_name", db_name.to_string()), KeyValue::new("svr_id", format!("{}", server_id)), @@ -347,6 +349,7 @@ pub async fn load_or_create_preserved_catalog( } impl Db { + #[allow(clippy::clippy::too_many_arguments)] pub fn new( rules: DatabaseRules, server_id: ServerId, @@ -354,6 +357,7 @@ impl Db { exec: Arc, jobs: Arc, preserved_catalog: PreservedCatalog, + catalog: Arc, write_buffer: Option>, ) -> Self { let db_name = rules.name.clone(); @@ -361,12 +365,12 @@ impl Db { let rules = RwLock::new(rules); let server_id = server_id; let store = Arc::clone(&object_store); - let metrics_registry = Arc::clone(&preserved_catalog.state().metrics_registry); - let metric_labels = preserved_catalog.state().metric_labels.clone(); + let metrics_registry = Arc::clone(&catalog.metrics_registry); + let metric_labels = catalog.metric_labels.clone(); let catalog_access = QueryCatalogAccess::new( &db_name, - preserved_catalog.state(), + Arc::clone(&catalog), Arc::clone(&jobs), Arc::clone(&metrics_registry), metric_labels.clone(), @@ -381,6 +385,7 @@ impl Db { store, exec, preserved_catalog: Arc::new(preserved_catalog), + catalog, jobs, metrics_registry, catalog_access, @@ -424,10 +429,7 @@ impl Db { table_name: &str, partition_key: &str, ) -> catalog::Result>> { - let partition = self - .preserved_catalog - .state() - .partition(table_name, partition_key)?; + let partition = self.catalog.partition(table_name, partition_key)?; Ok(Arc::clone(&partition)) } @@ -437,9 +439,7 @@ impl Db { partition_key: &str, chunk_id: u32, ) -> catalog::Result>> { - self.preserved_catalog - .state() - .chunk(table_name, partition_key, chunk_id) + self.catalog.chunk(table_name, partition_key, chunk_id) } pub fn lockable_chunk( @@ -554,14 +554,7 @@ impl Db { let mut rb_chunk = RBChunk::new( &table_summary.name, - ReadBufferChunkMetrics::new( - &metrics, - db.preserved_catalog - .state() - .metrics() - .memory() - .read_buffer(), - ), + ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()), ); let fut = async move { @@ -654,7 +647,8 @@ impl Db { .lifecycle_rules .catalog_transactions_until_checkpoint; - let catalog = Arc::clone(&db.preserved_catalog); + let preserved_catalog = Arc::clone(&db.preserved_catalog); + let catalog = Arc::clone(&db.catalog); let fut = async move { let table_name = table_summary.name.as_str(); @@ -676,7 +670,7 @@ impl Db { // catalog-level transaction for preservation layer { - let mut transaction = catalog.open_transaction().await; + let mut transaction = preserved_catalog.open_transaction(catalog).await; // Write this table data into the object store // @@ -701,18 +695,28 @@ impl Db { ) .await .context(WritingToObjectStore)?; + let path: DirsAndFileName = path.into(); transaction - .add_parquet(&path.into(), &parquet_metadata) + .add_parquet(&path, &parquet_metadata) .context(TransactionError)?; let create_checkpoint = catalog_transactions_until_checkpoint .map_or(false, |interval| { transaction.revision_counter() % interval.get() == 0 }); + let checkpoint_data = create_checkpoint.then(|| { + let mut checkpoint_data = + checkpoint_data_from_catalog(&transaction.tstate().catalog); + // don't forget the file that we've just added + checkpoint_data + .files + .insert(path, Arc::new(parquet_metadata)); + checkpoint_data + }); transaction - .commit(create_checkpoint) + .commit(checkpoint_data) .await .context(TransactionError)?; } @@ -756,11 +760,8 @@ impl Db { pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec { let partition_key = Some(partition_key); let table_names = TableNameFilter::AllTables; - self.preserved_catalog.state().filtered_chunks( - table_names, - partition_key, - CatalogChunk::summary, - ) + self.catalog + .filtered_chunks(table_names, partition_key, CatalogChunk::summary) } /// Return Summary information for all columns in all chunks in the @@ -770,8 +771,7 @@ impl Db { table_name: &str, partition_key: &str, ) -> Option { - self.preserved_catalog - .state() + self.catalog .partition(table_name, partition_key) .map(|partition| partition.read().summary()) .ok() @@ -840,7 +840,7 @@ impl Db { debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; - if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, Arc::clone(&self.catalog), 1_000).await { error!(%e, "error in background cleanup task"); } } => {}, @@ -918,7 +918,7 @@ impl Db { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = buffer_size_hard { - if self.preserved_catalog.state().metrics().memory().total() > hard_limit.get() { + if self.catalog.metrics().memory().total() > hard_limit.get() { return HardLimitReached {}.fail(); } } @@ -932,8 +932,7 @@ impl Db { } let partition = self - .preserved_catalog - .state() + .catalog .get_or_create_partition(table_batch.name(), partition_key); let mut partition = partition.write(); @@ -970,11 +969,7 @@ impl Db { table_batch.name(), MutableBufferChunkMetrics::new( &metrics, - self.preserved_catalog - .state() - .metrics() - .memory() - .mutable_buffer(), + self.catalog.metrics().memory().mutable_buffer(), ), ); @@ -1349,20 +1344,20 @@ impl CatalogState for Catalog { Ok(()) } } +} - fn files(&self) -> HashMap> { - let mut files = HashMap::new(); +fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData { + let mut files = HashMap::new(); - for chunk in self.chunks() { - let guard = chunk.read(); - if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { - let path: DirsAndFileName = parquet.path().into(); - files.insert(path, parquet.parquet_metadata()); - } + for chunk in catalog.chunks() { + let guard = chunk.read(); + if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { + let path: DirsAndFileName = parquet.path().into(); + files.insert(path, parquet.parquet_metadata()); } - - files } + + CheckpointData { files } } pub mod test_helpers { @@ -1430,6 +1425,7 @@ mod tests { }; use parquet_file::{ catalog::test_helpers::assert_catalog_state_implementation, + metadata::IoxParquetMetaData, test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; @@ -2175,11 +2171,7 @@ mod tests { let after_write = Utc::now(); let last_write_prev = { - let partition = db - .preserved_catalog - .state() - .partition("cpu", partition_key) - .unwrap(); + let partition = db.catalog.partition("cpu", partition_key).unwrap(); let partition = partition.read(); assert_ne!(partition.created_at(), partition.last_write_at()); @@ -2190,11 +2182,7 @@ mod tests { write_lp(&db, "cpu bar=1 20"); { - let partition = db - .preserved_catalog - .state() - .partition("cpu", partition_key) - .unwrap(); + let partition = db.catalog.partition("cpu", partition_key).unwrap(); let partition = partition.read(); assert!(last_write_prev < partition.last_write_at()); } @@ -2219,11 +2207,7 @@ mod tests { .id(); let after_rollover = Utc::now(); - let partition = db - .preserved_catalog - .state() - .partition("cpu", partition_key) - .unwrap(); + let partition = db.catalog.partition("cpu", partition_key).unwrap(); let partition = partition.read(); let chunk = partition.chunk(chunk_id).unwrap(); let chunk = chunk.read(); @@ -2251,15 +2235,11 @@ mod tests { write_lp(&db, "cpu bar=1 10"); write_lp(&db, "cpu bar=1 20"); - let partitions = db.preserved_catalog.state().partition_keys(); + let partitions = db.catalog.partition_keys(); assert_eq!(partitions.len(), 1); let partition_key = partitions.into_iter().next().unwrap(); - let partition = db - .preserved_catalog - .state() - .partition("cpu", &partition_key) - .unwrap(); + let partition = db.catalog.partition("cpu", &partition_key).unwrap(); let partition = partition.read(); let chunks: Vec<_> = partition.chunks().collect(); @@ -2292,7 +2272,7 @@ mod tests { order: Order::Desc, sort: Sort::LastWriteTime, }; - let chunks = db.preserved_catalog.state().chunks_sorted_by(&sort_rules); + let chunks = db.catalog.chunks_sorted_by(&sort_rules); let partitions: Vec<_> = chunks .into_iter() .map(|x| x.read().key().to_string()) @@ -2304,7 +2284,7 @@ mod tests { order: Order::Asc, sort: Sort::CreatedAtTime, }; - let chunks = db.preserved_catalog.state().chunks_sorted_by(&sort_rules); + let chunks = db.catalog.chunks_sorted_by(&sort_rules); let partitions: Vec<_> = chunks .into_iter() .map(|x| x.read().key().to_string()) @@ -2414,12 +2394,7 @@ mod tests { .sum(); assert_eq!( - db.preserved_catalog - .state() - .metrics() - .memory() - .mutable_buffer() - .get_total(), + db.catalog.metrics().memory().mutable_buffer().get_total(), size ); @@ -2552,32 +2527,14 @@ mod tests { ); assert_eq!( - db.preserved_catalog - .state() - .metrics() - .memory() - .mutable_buffer() - .get_total(), + db.catalog.metrics().memory().mutable_buffer().get_total(), 64 + 2190 + 87 ); assert_eq!( - db.preserved_catalog - .state() - .metrics() - .memory() - .read_buffer() - .get_total(), + db.catalog.metrics().memory().read_buffer().get_total(), 1484 ); - assert_eq!( - db.preserved_catalog - .state() - .metrics() - .memory() - .parquet() - .get_total(), - 663 - ); + assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 663); } #[tokio::test] @@ -2863,7 +2820,7 @@ mod tests { .eq(0.) .unwrap(); - let chunks = db.preserved_catalog.state().chunks(); + let chunks = db.catalog.chunks(); assert_eq!(chunks.len(), 1); let chunk_a = Arc::clone(&chunks[0]); @@ -2958,7 +2915,7 @@ mod tests { let server_id = ServerId::try_from(1).unwrap(); let db_name = "preserved_catalog_test"; - let preserved_catalog = + let (preserved_catalog, _catalog) = PreservedCatalog::::new_empty( Arc::clone(&object_store), server_id, @@ -3029,7 +2986,7 @@ mod tests { } } paths_expected.sort(); - let preserved_catalog = + let (_preserved_catalog, catalog) = PreservedCatalog::::load( Arc::clone(&object_store), server_id, @@ -3040,8 +2997,7 @@ mod tests { .unwrap() .unwrap(); let paths_actual = { - let state = preserved_catalog.state(); - let mut tmp: Vec = state.parquet_files.keys().map(|p| p.display()).collect(); + let mut tmp: Vec = catalog.parquet_files.keys().map(|p| p.display()).collect(); tmp.sort(); tmp }; @@ -3251,7 +3207,11 @@ mod tests { metrics_registry, metric_labels: vec![], }; - assert_catalog_state_implementation::(empty_input).await; + assert_catalog_state_implementation::( + empty_input, + checkpoint_data_from_catalog, + ) + .await; } async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 5f370db56c..2ea500ad5e 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -77,7 +77,7 @@ impl<'a> LifecycleDb for &'a Db { type Chunk = LockableCatalogChunk<'a>; fn buffer_size(self) -> usize { - self.preserved_catalog.state().metrics().memory().total() + self.catalog.metrics().memory().total() } fn rules(self) -> LifecycleRules { @@ -85,8 +85,7 @@ impl<'a> LifecycleDb for &'a Db { } fn chunks(self, sort_order: &SortOrder) -> Vec { - self.preserved_catalog - .state() + self.catalog .chunks_sorted_by(sort_order) .into_iter() .map(|chunk| LockableCatalogChunk { db: self, chunk }) diff --git a/server/src/init.rs b/server/src/init.rs index 9706a261e0..efaa6cd479 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -325,10 +325,10 @@ impl InitStatus { .map_err(|e| Box::new(e) as _) .context(CatalogLoadError) { - Ok(preserved_catalog) => { + Ok((preserved_catalog, catalog)) => { // everything is there, can create DB handle - .commit_db(server_id, store, exec, preserved_catalog, rules) + .commit_db(server_id, store, exec, preserved_catalog, catalog, rules) .map_err(Box::new) .context(CreateDbError)?; Ok(()) @@ -418,9 +418,9 @@ impl InitStatus { .map_err(|e| Box::new(e) as _) .context(CatalogLoadError) { - Ok(preserved_catalog) => { + Ok((preserved_catalog, catalog)) => { handle - .commit_db(server_id, store, exec, preserved_catalog, None) + .commit_db(server_id, store, exec, preserved_catalog, catalog, None) .map_err(|e | { warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover"); Box::new(e) diff --git a/server/src/lib.rs b/server/src/lib.rs index 656b3e1a4a..47ae2a5c53 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -487,7 +487,7 @@ where let db_reservation = self.config.create_db(rules.name.clone())?; self.persist_database_rules(rules.clone()).await?; - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( rules.db_name(), Arc::clone(&self.store), server_id, @@ -503,6 +503,7 @@ where Arc::clone(&self.store), Arc::clone(&self.exec), preserved_catalog, + catalog, rules, )?; @@ -1962,7 +1963,7 @@ mod tests { ) .await .unwrap(); - let preserved_catalog = PreservedCatalog::::load( + let (preserved_catalog, _catalog) = PreservedCatalog::::load( Arc::clone(&store), server_id, db_name_catalog_broken.to_string(), diff --git a/server/src/utils.rs b/server/src/utils.rs index 4aaec630ce..298029b579 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -57,7 +57,7 @@ impl TestDbBuilder { let exec = Arc::new(Executor::new(1)); let metrics_registry = Arc::new(metrics::MetricRegistry::new()); - let preserved_catalog = load_or_create_preserved_catalog( + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( db_name.as_str(), Arc::clone(&object_store), server_id, @@ -87,6 +87,7 @@ impl TestDbBuilder { exec, Arc::new(JobRegistry::new()), preserved_catalog, + catalog, self.write_buffer, ), }