Merge pull request #1766 from influxdata/crepererum/issue1740-a

refactor: decouple preserved and in-mem catalog (part 1)
pull/24376/head
kodiakhq[bot] 2021-06-22 07:24:17 +00:00 committed by GitHub
commit f79349bce0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 384 additions and 380 deletions

View File

@ -6,6 +6,7 @@ use std::{
}, },
convert::TryInto, convert::TryInto,
fmt::{Debug, Display}, fmt::{Debug, Display},
marker::PhantomData,
num::TryFromIntError, num::TryFromIntError,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
@ -254,11 +255,6 @@ pub trait CatalogState {
/// Remove parquet file from state. /// Remove parquet file from state.
fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()>; fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()>;
/// List all Parquet files that are currently (i.e. by the current version) tracked by the catalog.
///
/// If a file was once [added](Self::add) but later [removed](Self::remove) it MUST NOT appear in the result.
fn files(&self) -> HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>;
} }
/// Find last transaction-start-timestamp. /// Find last transaction-start-timestamp.
@ -290,15 +286,6 @@ pub async fn find_last_transaction_timestamp(
Ok(res) Ok(res)
} }
/// Inner mutable part of the preserved catalog.
struct PreservedCatalogInner<S>
where
S: CatalogState,
{
previous_tkey: Option<TransactionKey>,
state: Arc<S>,
}
/// In-memory view of the preserved catalog. /// In-memory view of the preserved catalog.
pub struct PreservedCatalog<S> pub struct PreservedCatalog<S>
where where
@ -322,12 +309,15 @@ where
// 9. release semaphore // 9. release semaphore
// //
// Note that there can only be a single transaction that acquires the semaphore. // Note that there can only be a single transaction that acquires the semaphore.
inner: RwLock<PreservedCatalogInner<S>>, previous_tkey: RwLock<Option<TransactionKey>>,
transaction_semaphore: Semaphore, transaction_semaphore: Semaphore,
object_store: Arc<ObjectStore>, object_store: Arc<ObjectStore>,
server_id: ServerId, server_id: ServerId,
db_name: String, db_name: String,
// temporary measure to keep the API a bit more stable
phantom: PhantomData<S>,
} }
/// Deletes catalog. /// Deletes catalog.
@ -371,31 +361,28 @@ where
server_id: ServerId, server_id: ServerId,
db_name: impl Into<String> + Send, db_name: impl Into<String> + Send,
state_data: S::EmptyInput, state_data: S::EmptyInput,
) -> Result<Self> { ) -> Result<(Self, Arc<S>)> {
let db_name = db_name.into(); let db_name = db_name.into();
if Self::exists(&object_store, server_id, &db_name).await? { if Self::exists(&object_store, server_id, &db_name).await? {
return Err(Error::AlreadyExists {}); return Err(Error::AlreadyExists {});
} }
let state = Arc::new(S::new_empty(&db_name, state_data));
let inner = PreservedCatalogInner {
previous_tkey: None,
state: Arc::new(S::new_empty(&db_name, state_data)),
};
let catalog = Self { let catalog = Self {
inner: RwLock::new(inner), previous_tkey: RwLock::new(None),
transaction_semaphore: Semaphore::new(1), transaction_semaphore: Semaphore::new(1),
object_store, object_store,
server_id, server_id,
db_name, db_name,
phantom: PhantomData,
}; };
// add empty transaction // add empty transaction
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
transaction.commit(false).await?; let state = transaction.commit(None).await?;
Ok(catalog) Ok((catalog, state))
} }
/// Load existing catalog from store, if it exists. /// Load existing catalog from store, if it exists.
@ -407,7 +394,7 @@ where
server_id: ServerId, server_id: ServerId,
db_name: String, db_name: String,
state_data: S::EmptyInput, state_data: S::EmptyInput,
) -> Result<Option<Self>> { ) -> Result<Option<(Self, Arc<S>)>> {
// parse all paths into revisions // parse all paths into revisions
let mut transactions: HashMap<u64, Uuid> = HashMap::new(); let mut transactions: HashMap<u64, Uuid> = HashMap::new();
let mut max_revision = None; let mut max_revision = None;
@ -462,7 +449,7 @@ where
} }
// setup empty state // setup empty state
let mut state = Arc::new(CatalogState::new_empty(&db_name, state_data)); let mut state = Arc::new(S::new_empty(&db_name, state_data));
let mut last_tkey = None; let mut last_tkey = None;
// detect replay start // detect replay start
@ -498,18 +485,17 @@ where
last_tkey = Some(tkey); last_tkey = Some(tkey);
} }
let inner = PreservedCatalogInner { Ok(Some((
previous_tkey: last_tkey, Self {
state, previous_tkey: RwLock::new(last_tkey),
};
Ok(Some(Self {
inner: RwLock::new(inner),
transaction_semaphore: Semaphore::new(1), transaction_semaphore: Semaphore::new(1),
object_store, object_store,
server_id, server_id,
db_name, db_name,
})) phantom: PhantomData,
},
state,
)))
} }
/// Open a new transaction. /// Open a new transaction.
@ -518,26 +504,24 @@ where
/// transaction handle is dropped. The newly created transaction will contain the state after `await` (esp. /// transaction handle is dropped. The newly created transaction will contain the state after `await` (esp.
/// post-blocking). This system is fair, which means that transactions are given out in the order they were /// post-blocking). This system is fair, which means that transactions are given out in the order they were
/// requested. /// requested.
pub async fn open_transaction(&self) -> TransactionHandle<'_, S> { pub async fn open_transaction(&self, state: Arc<S>) -> TransactionHandle<'_, S> {
self.open_transaction_with_uuid(Uuid::new_v4()).await self.open_transaction_with_uuid(Uuid::new_v4(), state).await
} }
/// Crate-private API to open an transaction with a specified UUID. Should only be used for catalog rebuilding or /// Crate-private API to open an transaction with a specified UUID. Should only be used for catalog rebuilding or
/// with a fresh V4-UUID! /// with a fresh V4-UUID!
pub(crate) async fn open_transaction_with_uuid(&self, uuid: Uuid) -> TransactionHandle<'_, S> { pub(crate) async fn open_transaction_with_uuid(
TransactionHandle::new(self, uuid).await &self,
} uuid: Uuid,
state: Arc<S>,
/// Return current state. ) -> TransactionHandle<'_, S> {
pub fn state(&self) -> Arc<S> { TransactionHandle::new(self, uuid, state).await
Arc::clone(&self.inner.read().state)
} }
/// Get latest revision counter. /// Get latest revision counter.
pub fn revision_counter(&self) -> u64 { pub fn revision_counter(&self) -> u64 {
self.inner self.previous_tkey
.read() .read()
.previous_tkey
.clone() .clone()
.map(|tkey| tkey.revision_counter) .map(|tkey| tkey.revision_counter)
.expect("catalog should have at least an empty transaction") .expect("catalog should have at least an empty transaction")
@ -835,14 +819,14 @@ where
S: CatalogState + Send + Sync, S: CatalogState + Send + Sync,
{ {
/// Private API to create new transaction, users should always use [`PreservedCatalog::open_transaction`]. /// Private API to create new transaction, users should always use [`PreservedCatalog::open_transaction`].
fn new(catalog_inner: &PreservedCatalogInner<S>, uuid: Uuid) -> Self { fn new(previous_tkey: &Option<TransactionKey>, uuid: Uuid, state: Arc<S>) -> Self {
let (revision_counter, previous_uuid) = match &catalog_inner.previous_tkey { let (revision_counter, previous_uuid) = match previous_tkey {
Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()), Some(tkey) => (tkey.revision_counter + 1, tkey.uuid.to_string()),
None => (0, String::new()), None => (0, String::new()),
}; };
Self { Self {
tstate: S::transaction_begin(&catalog_inner.state), tstate: S::transaction_begin(&state),
proto: proto::Transaction { proto: proto::Transaction {
actions: vec![], actions: vec![],
version: TRANSACTION_VERSION, version: TRANSACTION_VERSION,
@ -917,16 +901,19 @@ where
} }
/// Commit to mutable catalog and return previous transaction key. /// Commit to mutable catalog and return previous transaction key.
fn commit(self, catalog_inner: &mut PreservedCatalogInner<S>) -> Option<TransactionKey> { fn commit(
self,
previous_tkey: &mut Option<TransactionKey>,
) -> (Option<TransactionKey>, Arc<S>) {
let mut tkey = Some(self.tkey()); let mut tkey = Some(self.tkey());
catalog_inner.state = S::transaction_end(self.tstate, TransactionEnd::Commit); let state = S::transaction_end(self.tstate, TransactionEnd::Commit);
std::mem::swap(&mut catalog_inner.previous_tkey, &mut tkey); std::mem::swap(previous_tkey, &mut tkey);
tkey (tkey, state)
} }
/// Abort transaction /// Abort transaction
fn abort(self, catalog_inner: &mut PreservedCatalogInner<S>) { fn abort(self) -> Arc<S> {
catalog_inner.state = S::transaction_end(self.tstate, TransactionEnd::Abort); S::transaction_end(self.tstate, TransactionEnd::Abort)
} }
async fn store( async fn store(
@ -1026,6 +1013,18 @@ where
} }
} }
/// Structure that holds all information required to create a checkpoint.
///
/// Note that while checkpoint are addressed using the same schema as we use for transaction (revision counter, UUID),
/// they contain the changes at the end (aka including) the transaction they refer.
#[derive(Debug)]
pub struct CheckpointData {
/// List of all Parquet files that are currently (i.e. by the current version) tracked by the catalog.
///
/// If a file was once added but later removed it MUST NOT appear in the result.
pub files: HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>,
}
/// Handle for an open uncommitted transaction. /// Handle for an open uncommitted transaction.
/// ///
/// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning. /// Dropping this object w/o calling [`commit`](Self::commit) will issue a warning.
@ -1047,19 +1046,23 @@ impl<'c, S> TransactionHandle<'c, S>
where where
S: CatalogState + Send + Sync, S: CatalogState + Send + Sync,
{ {
async fn new(catalog: &'c PreservedCatalog<S>, uuid: Uuid) -> TransactionHandle<'c, S> { async fn new(
catalog: &'c PreservedCatalog<S>,
uuid: Uuid,
state: Arc<S>,
) -> TransactionHandle<'c, S> {
// first acquire semaphore (which is only being used for transactions), then get state lock // first acquire semaphore (which is only being used for transactions), then get state lock
let permit = catalog let permit = catalog
.transaction_semaphore .transaction_semaphore
.acquire() .acquire()
.await .await
.expect("semaphore should not be closed"); .expect("semaphore should not be closed");
let inner_guard = catalog.inner.write(); let previous_tkey_guard = catalog.previous_tkey.write();
let transaction = OpenTransaction::new(&inner_guard, uuid); let transaction = OpenTransaction::new(&previous_tkey_guard, uuid, state);
// free state for readers again // free state for readers again
drop(inner_guard); drop(previous_tkey_guard);
let tkey = transaction.tkey(); let tkey = transaction.tkey();
info!(?tkey, "transaction started"); info!(?tkey, "transaction started");
@ -1071,6 +1074,15 @@ where
} }
} }
/// Get current transaction state.
pub fn tstate(&self) -> &S::TransactionState {
&self
.transaction
.as_ref()
.expect("No transaction in progress?")
.tstate
}
/// Get revision counter for this transaction. /// Get revision counter for this transaction.
pub fn revision_counter(&self) -> u64 { pub fn revision_counter(&self) -> u64 {
self.transaction self.transaction
@ -1093,10 +1105,13 @@ where
/// ///
/// This will first commit to object store and then to the in-memory state. /// This will first commit to object store and then to the in-memory state.
/// ///
/// If `create_checkpoint` is set this will also create a checkpoint at the end of the commit. Note that if the /// # Checkpointing
/// If `checkpoint_data` is passed this will also create a checkpoint at the end of the commit. Note that if the
/// checkpoint creation fails, the commit will still be treated as completed since the checkpoint is a mere /// checkpoint creation fails, the commit will still be treated as completed since the checkpoint is a mere
/// optimization to speed up transaction replay and allow to prune the history. /// optimization to speed up transaction replay and allow to prune the history.
pub async fn commit(mut self, create_checkpoint: bool) -> Result<()> { ///
/// Note that `checkpoint_data` must contain the state INCLUDING the to-be-commited transaction.
pub async fn commit(mut self, checkpoint_data: Option<CheckpointData>) -> Result<Arc<S>> {
let t = std::mem::take(&mut self.transaction) let t = std::mem::take(&mut self.transaction)
.expect("calling .commit on a closed transaction?!"); .expect("calling .commit on a closed transaction?!");
let tkey = t.tkey(); let tkey = t.tkey();
@ -1118,17 +1133,18 @@ where
// maybe create a checkpoint // maybe create a checkpoint
// IMPORTANT: Create the checkpoint AFTER commiting the transaction to object store and to the in-memory state. // IMPORTANT: Create the checkpoint AFTER commiting the transaction to object store and to the in-memory state.
// Checkpoints are an optional optimization and are not required to materialize a transaction. // Checkpoints are an optional optimization and are not required to materialize a transaction.
if create_checkpoint { if let Some(checkpoint_data) = checkpoint_data {
// NOTE: `inner_guard` will not be re-used here since it is a strong write lock and the checkpoint creation // NOTE: `inner_guard` will not be re-used here since it is a strong write lock and the checkpoint creation
// only needs a read lock. // only needs a read lock.
self.create_checkpoint(tkey, previous_tkey, state).await?; self.create_checkpoint(tkey, previous_tkey, checkpoint_data)
.await?;
} }
Ok(()) Ok(state)
} }
Err(e) => { Err(e) => {
warn!(?tkey, "failure while writing transaction, aborting"); warn!(?tkey, "failure while writing transaction, aborting");
self.abort_inner(t); t.abort();
Err(e) Err(e)
} }
} }
@ -1142,32 +1158,23 @@ where
/// - rustc seems to fold the guard into the async generator state even when we `drop` it quickly, making the /// - rustc seems to fold the guard into the async generator state even when we `drop` it quickly, making the
/// resulting future `!Send`. However tokio requires our futures to be `Send`. /// resulting future `!Send`. However tokio requires our futures to be `Send`.
fn commit_inner(&self, t: OpenTransaction<S>) -> (Option<TransactionKey>, Arc<S>) { fn commit_inner(&self, t: OpenTransaction<S>) -> (Option<TransactionKey>, Arc<S>) {
let mut inner_guard = self.catalog.inner.write(); let mut previous_tkey_guard = self.catalog.previous_tkey.write();
let previous_tkey = t.commit(&mut inner_guard); t.commit(&mut previous_tkey_guard)
let state = Arc::clone(&inner_guard.state);
(previous_tkey, state)
}
fn abort_inner(&self, t: OpenTransaction<S>) {
let mut inner_guard = self.catalog.inner.write();
t.abort(&mut inner_guard);
} }
async fn create_checkpoint( async fn create_checkpoint(
&self, &self,
tkey: TransactionKey, tkey: TransactionKey,
previous_tkey: Option<TransactionKey>, previous_tkey: Option<TransactionKey>,
state: Arc<S>, checkpoint_data: CheckpointData,
) -> Result<()> { ) -> Result<()> {
let files = state.files();
let object_store = self.catalog.object_store(); let object_store = self.catalog.object_store();
let server_id = self.catalog.server_id(); let server_id = self.catalog.server_id();
let db_name = self.catalog.db_name(); let db_name = self.catalog.db_name();
// sort by key (= path) for deterministic output // sort by key (= path) for deterministic output
let files = { let files = {
let mut tmp: Vec<_> = files.into_iter().collect(); let mut tmp: Vec<_> = checkpoint_data.files.into_iter().collect();
tmp.sort_by_key(|(path, _metadata)| path.clone()); tmp.sort_by_key(|(path, _metadata)| path.clone());
tmp tmp
}; };
@ -1211,10 +1218,10 @@ where
} }
/// Abort transaction w/o commit. /// Abort transaction w/o commit.
pub fn abort(mut self) { pub fn abort(mut self) -> Arc<S> {
let t = std::mem::take(&mut self.transaction) let t = std::mem::take(&mut self.transaction)
.expect("calling .commit on a closed transaction?!"); .expect("calling .commit on a closed transaction?!");
self.abort_inner(t); t.abort()
} }
/// Add a new parquet file to the catalog. /// Add a new parquet file to the catalog.
@ -1272,7 +1279,7 @@ where
fn drop(&mut self) { fn drop(&mut self) {
if let Some(t) = self.transaction.take() { if let Some(t) = self.transaction.take() {
warn!(?self, "dropped uncommitted transaction, calling abort"); warn!(?self, "dropped uncommitted transaction, calling abort");
self.abort_inner(t); t.abort();
} }
} }
} }
@ -1292,10 +1299,19 @@ pub mod test_helpers {
pub parquet_files: HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>, pub parquet_files: HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>,
} }
impl TestCatalogState {
/// Simple way to create [`CheckpointData`].
pub fn checkpoint_data(&self) -> CheckpointData {
CheckpointData {
files: self.parquet_files.clone(),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TState { pub struct TState {
old: Arc<TestCatalogState>, pub old: Arc<TestCatalogState>,
new: TestCatalogState, pub new: TestCatalogState,
} }
impl CatalogState for TestCatalogState { impl CatalogState for TestCatalogState {
@ -1354,10 +1370,6 @@ pub mod test_helpers {
Ok(()) Ok(())
} }
fn files(&self) -> HashMap<DirsAndFileName, Arc<IoxParquetMetaData>> {
self.parquet_files.clone()
}
} }
/// Break preserved catalog by moving one of the transaction files into a weird unknown version. /// Break preserved catalog by moving one of the transaction files into a weird unknown version.
@ -1387,22 +1399,24 @@ pub mod test_helpers {
where where
S: CatalogState + Send + Sync, S: CatalogState + Send + Sync,
{ {
let guard = catalog.inner.read(); let guard = catalog.previous_tkey.read();
guard guard
.previous_tkey
.as_ref() .as_ref()
.expect("should have at least a single transaction") .expect("should have at least a single transaction")
.clone() .clone()
} }
/// Torture-test implementations for [`CatalogState`]. /// Torture-test implementations for [`CatalogState`].
pub async fn assert_catalog_state_implementation<S>(state_data: S::EmptyInput) ///
/// A function to extract [`CheckpointData`] from the [`CatalogState`] must be provided.
pub async fn assert_catalog_state_implementation<S, F>(state_data: S::EmptyInput, f: F)
where where
S: CatalogState + Send + Sync, S: CatalogState + Send + Sync,
F: Fn(&S) -> CheckpointData + Send,
{ {
// empty state // empty state
let object_store = make_object_store(); let object_store = make_object_store();
let catalog = PreservedCatalog::<S>::new_empty( let (catalog, mut state) = PreservedCatalog::<S>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
ServerId::try_from(1).unwrap(), ServerId::try_from(1).unwrap(),
"db1".to_string(), "db1".to_string(),
@ -1411,12 +1425,12 @@ pub mod test_helpers {
.await .await
.unwrap(); .unwrap();
let mut expected = HashMap::new(); let mut expected = HashMap::new();
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// add files // add files
let mut chunk_id_watermark = 5; let mut chunk_id_watermark = 5;
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
for chunk_id in 0..chunk_id_watermark { for chunk_id in 0..chunk_id_watermark {
let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref()); let path = parsed_path!(format!("chunk_{}", chunk_id).as_ref());
@ -1425,25 +1439,25 @@ pub mod test_helpers {
expected.insert(path, Arc::new(metadata)); expected.insert(path, Arc::new(metadata));
} }
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// remove files // remove files
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let path = parsed_path!("chunk_1"); let path = parsed_path!("chunk_1");
transaction.remove_parquet(&path).unwrap(); transaction.remove_parquet(&path).unwrap();
expected.remove(&path); expected.remove(&path);
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// add and remove in the same transaction // add and remove in the same transaction
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
let (_, metadata) = let (_, metadata) =
@ -1452,26 +1466,26 @@ pub mod test_helpers {
transaction.remove_parquet(&path).unwrap(); transaction.remove_parquet(&path).unwrap();
chunk_id_watermark += 1; chunk_id_watermark += 1;
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// remove and add in the same transaction // remove and add in the same transaction
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let path = parsed_path!("chunk_2"); let path = parsed_path!("chunk_2");
let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await;
transaction.remove_parquet(&path).unwrap(); transaction.remove_parquet(&path).unwrap();
transaction.add_parquet(&path, &metadata).unwrap(); transaction.add_parquet(&path, &metadata).unwrap();
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// add, remove, add in the same transaction // add, remove, add in the same transaction
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
let (_, metadata) = let (_, metadata) =
@ -1482,13 +1496,13 @@ pub mod test_helpers {
expected.insert(path, Arc::new(metadata)); expected.insert(path, Arc::new(metadata));
chunk_id_watermark += 1; chunk_id_watermark += 1;
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// remove, add, remove in same transaction // remove, add, remove in same transaction
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let path = parsed_path!("chunk_2"); let path = parsed_path!("chunk_2");
let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await; let (_, metadata) = make_metadata(&object_store, "ok", chunk_addr(2)).await;
@ -1497,13 +1511,13 @@ pub mod test_helpers {
transaction.remove_parquet(&path).unwrap(); transaction.remove_parquet(&path).unwrap();
expected.remove(&path); expected.remove(&path);
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// error handling, no real opt // error handling, no real opt
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let path = parsed_path!("chunk_0"); let path = parsed_path!("chunk_0");
@ -1517,13 +1531,13 @@ pub mod test_helpers {
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
chunk_id_watermark += 1; chunk_id_watermark += 1;
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// error handling, still something works // error handling, still something works
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let path = parsed_path!("chunk_0"); let path = parsed_path!("chunk_0");
@ -1558,13 +1572,13 @@ pub mod test_helpers {
let err = transaction.remove_parquet(&path).unwrap_err(); let err = transaction.remove_parquet(&path).unwrap_err();
assert!(matches!(err, Error::ParquetFileDoesNotExist { .. })); assert!(matches!(err, Error::ParquetFileDoesNotExist { .. }));
transaction.commit(true).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// transaction aborting // transaction aborting
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(Arc::clone(&state)).await;
// add // add
let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref()); let path = parsed_path!(format!("chunk_{}", chunk_id_watermark).as_ref());
@ -1585,11 +1599,11 @@ pub mod test_helpers {
transaction.remove_parquet(&path).unwrap(); transaction.remove_parquet(&path).unwrap();
chunk_id_watermark += 1; chunk_id_watermark += 1;
} }
assert_files_eq(&catalog.state().files(), &expected); assert_checkpoint(state.as_ref(), &f, &expected);
// transaction aborting w/ errors // transaction aborting w/ errors
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(Arc::clone(&state)).await;
// already exists (should also not change the metadata) // already exists (should also not change the metadata)
let path = parsed_path!("chunk_0"); let path = parsed_path!("chunk_0");
@ -1609,17 +1623,22 @@ pub mod test_helpers {
} }
/// Assert that tracked files and their linked metadata are equal. /// Assert that tracked files and their linked metadata are equal.
fn assert_files_eq( fn assert_checkpoint<S, F>(
actual: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>, state: &S,
expected: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>, f: &F,
) { expected_files: &HashMap<DirsAndFileName, Arc<IoxParquetMetaData>>,
let sorted_keys_actual = get_sorted_keys(actual); ) where
let sorted_keys_expected = get_sorted_keys(expected); F: Fn(&S) -> CheckpointData,
{
let actual_files = f(state).files;
let sorted_keys_actual = get_sorted_keys(&actual_files);
let sorted_keys_expected = get_sorted_keys(expected_files);
assert_eq!(sorted_keys_actual, sorted_keys_expected); assert_eq!(sorted_keys_actual, sorted_keys_expected);
for k in sorted_keys_actual { for k in sorted_keys_actual {
let md_actual = &actual[&k]; let md_actual = &actual_files[&k];
let md_expected = &expected[&k]; let md_expected = &expected_files[&k];
let iox_md_actual = md_actual.read_iox_metadata().unwrap(); let iox_md_actual = md_actual.read_iox_metadata().unwrap();
let iox_md_expected = md_expected.read_iox_metadata().unwrap(); let iox_md_expected = md_expected.read_iox_metadata().unwrap();
@ -1650,7 +1669,7 @@ pub mod test_helpers {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{num::NonZeroU32, ops::Deref}; use std::num::NonZeroU32;
use crate::test_utils::{chunk_addr, make_metadata, make_object_store}; use crate::test_utils::{chunk_addr, make_metadata, make_object_store};
use object_store::parsed_path; use object_store::parsed_path;
@ -1827,7 +1846,7 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file // break transaction file
let catalog = PreservedCatalog::<TestCatalogState>::load( let (catalog, _state) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -2155,7 +2174,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_transaction_handle_debug() { async fn test_transaction_handle_debug() {
let object_store = make_object_store(); let object_store = make_object_store();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
object_store, object_store,
make_server_id(), make_server_id(),
"db1".to_string(), "db1".to_string(),
@ -2163,7 +2182,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction(state).await;
// open transaction // open transaction
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
@ -2564,7 +2583,7 @@ mod tests {
let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// re-open catalog // re-open catalog
let catalog = PreservedCatalog::load( let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -2576,20 +2595,22 @@ mod tests {
// create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading) // create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading)
{ {
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
transaction.commit(true).await.unwrap(); let checkpoint_data = Some(transaction.tstate().new.checkpoint_data());
state = transaction.commit(checkpoint_data).await.unwrap();
} }
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// create another transaction on-top that adds a file (this transaction will be required to load the full state) // create another transaction on-top that adds a file (this transaction will be required to load the full state)
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
transaction transaction
.add_parquet(&parsed_path!("last_one"), &metadata) .add_parquet(&parsed_path!("last_one"), &metadata)
.unwrap(); .unwrap();
transaction.commit(true).await.unwrap(); let checkpoint_data = Some(transaction.tstate().new.checkpoint_data());
state = transaction.commit(checkpoint_data).await.unwrap();
} }
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// close catalog again // close catalog again
drop(catalog); drop(catalog);
@ -2611,7 +2632,7 @@ mod tests {
} }
// load catalog from store and check replayed state // load catalog from store and check replayed state
let catalog = PreservedCatalog::load( let (catalog, state) = PreservedCatalog::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -2625,7 +2646,7 @@ mod tests {
trace.tkeys.last().unwrap().revision_counter trace.tkeys.last().unwrap().revision_counter
); );
assert_catalog_parquet_files( assert_catalog_parquet_files(
&catalog, &state,
&get_catalog_parquet_files(trace.states.last().unwrap()), &get_catalog_parquet_files(trace.states.last().unwrap()),
); );
} }
@ -2643,10 +2664,10 @@ mod tests {
/// Assert that set of parquet files tracked by a catalog are identical to the given sorted list. /// Assert that set of parquet files tracked by a catalog are identical to the given sorted list.
fn assert_catalog_parquet_files( fn assert_catalog_parquet_files(
catalog: &PreservedCatalog<TestCatalogState>, state: &TestCatalogState,
expected: &[(String, IoxParquetMetaData)], expected: &[(String, IoxParquetMetaData)],
) { ) {
let actual = get_catalog_parquet_files(&catalog.state()); let actual = get_catalog_parquet_files(state);
for ((actual_path, actual_md), (expected_path, expected_md)) in for ((actual_path, actual_md), (expected_path, expected_md)) in
actual.iter().zip(expected.iter()) actual.iter().zip(expected.iter())
{ {
@ -2725,10 +2746,15 @@ mod tests {
} }
} }
fn record(&mut self, catalog: &PreservedCatalog<TestCatalogState>, aborted: bool) { fn record(
&mut self,
catalog: &PreservedCatalog<TestCatalogState>,
state: &TestCatalogState,
aborted: bool,
) {
self.tkeys self.tkeys
.push(catalog.inner.read().previous_tkey.clone().unwrap()); .push(catalog.previous_tkey.read().clone().unwrap());
self.states.push(catalog.state().deref().clone()); self.states.push(state.clone());
self.post_timestamps.push(Utc::now()); self.post_timestamps.push(Utc::now());
self.aborted.push(aborted); self.aborted.push(aborted);
} }
@ -2739,7 +2765,7 @@ mod tests {
server_id: ServerId, server_id: ServerId,
db_name: &str, db_name: &str,
) -> TestTrace { ) -> TestTrace {
let catalog = PreservedCatalog::new_empty( let (catalog, mut state) = PreservedCatalog::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -2757,12 +2783,12 @@ mod tests {
// empty catalog has no data // empty catalog has no data
assert_eq!(catalog.revision_counter(), 0); assert_eq!(catalog.revision_counter(), 0);
assert_catalog_parquet_files(&catalog, &[]); assert_catalog_parquet_files(&state, &[]);
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// fill catalog with examples // fill catalog with examples
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction(state).await;
t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap(); t.add_parquet(&parsed_path!("test1"), &metadata1).unwrap();
t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2) t.add_parquet(&parsed_path!(["sub1"], "test1"), &metadata2)
@ -2772,11 +2798,11 @@ mod tests {
t.add_parquet(&parsed_path!(["sub2"], "test1"), &metadata1) t.add_parquet(&parsed_path!(["sub2"], "test1"), &metadata1)
.unwrap(); .unwrap();
t.commit(false).await.unwrap(); state = t.commit(None).await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 1); assert_eq!(catalog.revision_counter(), 1);
assert_catalog_parquet_files( assert_catalog_parquet_files(
&catalog, &state,
&[ &[
("sub1/test1".to_string(), metadata2.clone()), ("sub1/test1".to_string(), metadata2.clone()),
("sub1/test2".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()),
@ -2784,11 +2810,11 @@ mod tests {
("test1".to_string(), metadata1.clone()), ("test1".to_string(), metadata1.clone()),
], ],
); );
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// modify catalog with examples // modify catalog with examples
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction(state).await;
// "real" modifications // "real" modifications
t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap(); t.add_parquet(&parsed_path!("test4"), &metadata1).unwrap();
@ -2802,11 +2828,11 @@ mod tests {
t.remove_parquet(&parsed_path!("test1")) t.remove_parquet(&parsed_path!("test1"))
.expect_err("removing twice should error"); .expect_err("removing twice should error");
t.commit(false).await.unwrap(); state = t.commit(None).await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files( assert_catalog_parquet_files(
&catalog, &state,
&[ &[
("sub1/test1".to_string(), metadata2.clone()), ("sub1/test1".to_string(), metadata2.clone()),
("sub1/test2".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()),
@ -2814,11 +2840,11 @@ mod tests {
("test4".to_string(), metadata1.clone()), ("test4".to_string(), metadata1.clone()),
], ],
); );
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// uncommitted modifications have no effect // uncommitted modifications have no effect
{ {
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction(Arc::clone(&state)).await;
t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap(); t.add_parquet(&parsed_path!("test5"), &metadata1).unwrap();
t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap(); t.remove_parquet(&parsed_path!(["sub1"], "test2")).unwrap();
@ -2827,7 +2853,7 @@ mod tests {
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);
assert_catalog_parquet_files( assert_catalog_parquet_files(
&catalog, &state,
&[ &[
("sub1/test1".to_string(), metadata2.clone()), ("sub1/test1".to_string(), metadata2.clone()),
("sub1/test2".to_string(), metadata2.clone()), ("sub1/test2".to_string(), metadata2.clone()),
@ -2835,7 +2861,7 @@ mod tests {
("test4".to_string(), metadata1.clone()), ("test4".to_string(), metadata1.clone()),
], ],
); );
trace.record(&catalog, true); trace.record(&catalog, &state, true);
trace trace
} }
@ -2923,7 +2949,7 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await; assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await;
// break // break
let catalog = PreservedCatalog::<TestCatalogState>::load( let (catalog, _state) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -2999,7 +3025,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_transaction_handle_revision_counter() { async fn test_transaction_handle_revision_counter() {
let object_store = make_object_store(); let object_store = make_object_store();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
object_store, object_store,
make_server_id(), make_server_id(),
"db1".to_string(), "db1".to_string(),
@ -3007,7 +3033,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let t = catalog.open_transaction().await; let t = catalog.open_transaction(state).await;
assert_eq!(t.revision_counter(), 1); assert_eq!(t.revision_counter(), 1);
} }
@ -3015,7 +3041,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_transaction_handle_uuid() { async fn test_transaction_handle_uuid() {
let object_store = make_object_store(); let object_store = make_object_store();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
object_store, object_store,
make_server_id(), make_server_id(),
"db1".to_string(), "db1".to_string(),
@ -3023,7 +3049,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let mut t = catalog.open_transaction().await; let mut t = catalog.open_transaction(state).await;
t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string(); t.transaction.as_mut().unwrap().proto.uuid = Uuid::nil().to_string();
assert_eq!(t.uuid(), Uuid::nil()); assert_eq!(t.uuid(), Uuid::nil());
@ -3180,7 +3206,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
let catalog = PreservedCatalog::load( let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -3192,10 +3218,11 @@ mod tests {
// create empty transaction w/ checkpoint // create empty transaction w/ checkpoint
{ {
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
transaction.commit(true).await.unwrap(); let checkpoint_data = Some(transaction.tstate().new.checkpoint_data());
state = transaction.commit(checkpoint_data).await.unwrap();
} }
trace.record(&catalog, false); trace.record(&catalog, &state, false);
// delete transaction files // delete transaction files
for (aborted, tkey) in trace.aborted.iter().zip(trace.tkeys.iter()) { for (aborted, tkey) in trace.aborted.iter().zip(trace.tkeys.iter()) {
@ -3246,7 +3273,7 @@ mod tests {
let trace = assert_single_catalog_inmem_works(object_store, server_id, db_name).await; let trace = assert_single_catalog_inmem_works(object_store, server_id, db_name).await;
// load catalog from store and check replayed state // load catalog from store and check replayed state
let catalog = let (catalog, state) =
PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string(), ()) PreservedCatalog::load(Arc::clone(object_store), server_id, db_name.to_string(), ())
.await .await
.unwrap() .unwrap()
@ -3256,7 +3283,7 @@ mod tests {
trace.tkeys.last().unwrap().revision_counter trace.tkeys.last().unwrap().revision_counter
); );
assert_catalog_parquet_files( assert_catalog_parquet_files(
&catalog, &state,
&get_catalog_parquet_files(trace.states.last().unwrap()), &get_catalog_parquet_files(trace.states.last().unwrap()),
); );
} }
@ -3273,7 +3300,7 @@ mod tests {
.unwrap() .unwrap()
); );
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -3283,7 +3310,7 @@ mod tests {
.unwrap(); .unwrap();
// delete transaction file // delete transaction file
let tkey = catalog.inner.read().previous_tkey.clone().unwrap(); let tkey = catalog.previous_tkey.read().clone().unwrap();
let path = file_path( let path = file_path(
&object_store, &object_store,
server_id, server_id,
@ -3295,12 +3322,13 @@ mod tests {
// create empty transaction w/ checkpoint // create empty transaction w/ checkpoint
{ {
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
transaction.commit(true).await.unwrap(); let checkpoint_data = Some(transaction.tstate().new.checkpoint_data());
transaction.commit(checkpoint_data).await.unwrap();
} }
// delete transaction file // delete transaction file
let tkey = catalog.inner.read().previous_tkey.clone().unwrap(); let tkey = catalog.previous_tkey.read().clone().unwrap();
let path = file_path( let path = file_path(
&object_store, &object_store,
server_id, server_id,
@ -3330,6 +3358,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_catalog_state() { async fn test_catalog_state() {
assert_catalog_state_implementation::<TestCatalogState>(()).await; assert_catalog_state_implementation::<TestCatalogState, _>(
(),
TestCatalogState::checkpoint_data,
)
.await;
} }
} }

View File

@ -1,13 +1,12 @@
//! Methods to cleanup the object store. //! Methods to cleanup the object store.
use std::{ use std::{
collections::{HashMap, HashSet}, collections::HashSet,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
use crate::{ use crate::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog, TransactionEnd}, catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog, TransactionEnd},
metadata::IoxParquetMetaData,
storage::data_location, storage::data_location,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
@ -42,6 +41,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// use `max_files` which will limit the number of files to delete in this cleanup round. /// use `max_files` which will limit the number of files to delete in this cleanup round.
pub async fn cleanup_unreferenced_parquet_files<S>( pub async fn cleanup_unreferenced_parquet_files<S>(
catalog: &PreservedCatalog<S>, catalog: &PreservedCatalog<S>,
state: Arc<S>,
max_files: usize, max_files: usize,
) -> Result<()> ) -> Result<()>
where where
@ -49,14 +49,14 @@ where
{ {
// Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there // Create a transaction to prevent parallel modifications of the catalog. This avoids that we delete files there
// that are about to get added to the catalog. // that are about to get added to the catalog.
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
let store = catalog.object_store(); let store = catalog.object_store();
let server_id = catalog.server_id(); let server_id = catalog.server_id();
let db_name = catalog.db_name(); let db_name = catalog.db_name();
let all_known = { let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced // replay catalog transactions to track ALL (even dropped) files that are referenced
let catalog = PreservedCatalog::<TracerCatalogState>::load( let (_catalog, state) = PreservedCatalog::<TracerCatalogState>::load(
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -65,12 +65,9 @@ where
.await .await
.context(CatalogLoadError)? .context(CatalogLoadError)?
.expect("catalog gone while reading it?"); .expect("catalog gone while reading it?");
catalog
.state() let file_guard = state.files.lock().expect("lock poissened?");
.files file_guard.clone()
.lock()
.expect("lock poissened?")
.clone()
}; };
let prefix = data_location(&store, server_id, db_name); let prefix = data_location(&store, server_id, db_name);
@ -165,10 +162,6 @@ impl CatalogState for TracerCatalogState {
// Do NOT remove the file since we still need it for time travel // Do NOT remove the file since we still need it for time travel
Ok(()) Ok(())
} }
fn files(&self) -> HashMap<DirsAndFileName, Arc<IoxParquetMetaData>> {
unimplemented!("File tracking not implemented for TracerCatalogState")
}
} }
#[cfg(test)] #[cfg(test)]
@ -191,7 +184,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = "db1"; let db_name = "db1";
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -201,7 +194,7 @@ mod tests {
.unwrap(); .unwrap();
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, 1_000) cleanup_unreferenced_parquet_files(&catalog, state, 1_000)
.await .await
.unwrap(); .unwrap();
} }
@ -212,7 +205,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -225,7 +218,7 @@ mod tests {
let mut paths_keep = vec![]; let mut paths_keep = vec![];
let mut paths_delete = vec![]; let mut paths_delete = vec![];
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
// an ordinary tracked parquet file => keep // an ordinary tracked parquet file => keep
let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(1)).await;
@ -249,11 +242,11 @@ mod tests {
let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await; let (path, _md) = make_metadata(&object_store, "foo", chunk_addr(3)).await;
paths_delete.push(path.display()); paths_delete.push(path.display());
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, 1_000) cleanup_unreferenced_parquet_files(&catalog, state, 1_000)
.await .await
.unwrap(); .unwrap();
@ -273,7 +266,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -286,17 +279,17 @@ mod tests {
for i in 0..100 { for i in 0..100 {
let (path, _) = tokio::join!( let (path, _) = tokio::join!(
async { async {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(Arc::clone(&state)).await;
let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await; let (path, md) = make_metadata(&object_store, "foo", chunk_addr(i)).await;
transaction.add_parquet(&path.clone().into(), &md).unwrap(); transaction.add_parquet(&path.clone().into(), &md).unwrap();
transaction.commit(false).await.unwrap(); transaction.commit(None).await.unwrap();
path.display() path.display()
}, },
async { async {
cleanup_unreferenced_parquet_files(&catalog, 1_000) cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 1_000)
.await .await
.unwrap(); .unwrap();
}, },
@ -313,7 +306,7 @@ mod tests {
let server_id = make_server_id(); let server_id = make_server_id();
let db_name = db_name(); let db_name = db_name();
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -330,7 +323,7 @@ mod tests {
} }
// run clean-up // run clean-up
cleanup_unreferenced_parquet_files(&catalog, 2) cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2)
.await .await
.unwrap(); .unwrap();
@ -340,7 +333,7 @@ mod tests {
assert_eq!(leftover.len(), 1); assert_eq!(leftover.len(), 1);
// run clean-up again // run clean-up again
cleanup_unreferenced_parquet_files(&catalog, 2) cleanup_unreferenced_parquet_files(&catalog, Arc::clone(&state), 2)
.await .await
.unwrap(); .unwrap();

View File

@ -15,7 +15,7 @@ use snafu::{ResultExt, Snafu};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
catalog::{CatalogState, PreservedCatalog}, catalog::{CatalogState, CheckpointData, PreservedCatalog},
metadata::{IoxMetadata, IoxParquetMetaData}, metadata::{IoxMetadata, IoxParquetMetaData},
}; };
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -96,57 +96,63 @@ pub async fn rebuild_catalog<S, N>(
db_name: N, db_name: N,
catalog_empty_input: S::EmptyInput, catalog_empty_input: S::EmptyInput,
ignore_metadata_read_failure: bool, ignore_metadata_read_failure: bool,
) -> Result<PreservedCatalog<S>> ) -> Result<(PreservedCatalog<S>, Arc<S>)>
where where
S: CatalogState + Send + Sync, S: CatalogState + Send + Sync,
N: Into<String> + Send, N: Into<String> + Send,
{ {
// collect all revisions from parquet files // collect all revisions from parquet files
let revisions = let mut revisions =
collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?; collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?;
// create new empty catalog // create new empty catalog
let catalog = let (catalog, mut state) =
PreservedCatalog::<S>::new_empty(object_store, server_id, db_name, catalog_empty_input) PreservedCatalog::<S>::new_empty(object_store, server_id, db_name, catalog_empty_input)
.await .await
.context(NewEmptyFailure)?; .context(NewEmptyFailure)?;
// trace all files for final checkpoint
let mut collected_files = HashMap::new();
// simulate all transactions // simulate all transactions
if let Some(max_revision) = revisions.keys().max() { if let Some(max_revision) = revisions.keys().max().cloned() {
for revision_counter in 1..=*max_revision { for revision_counter in 1..=max_revision {
assert_eq!( assert_eq!(
catalog.revision_counter() + 1, catalog.revision_counter() + 1,
revision_counter, revision_counter,
"revision counter during transaction simulation out-of-sync" "revision counter during transaction simulation out-of-sync"
); );
let create_checkpoint = revision_counter == *max_revision;
if let Some((uuid, entries)) = revisions.get(&revision_counter) { if let Some((uuid, entries)) = revisions.remove(&revision_counter) {
// we have files for this particular transaction // we have files for this particular transaction
let mut transaction = catalog.open_transaction_with_uuid(*uuid).await; let mut transaction = catalog.open_transaction_with_uuid(uuid, state).await;
for (path, metadata) in entries { for (path, metadata) in entries {
let path: DirsAndFileName = path.clone().into(); let path: DirsAndFileName = path.clone().into();
transaction transaction
.add_parquet(&path, metadata) .add_parquet(&path, &metadata)
.context(FileRecordFailure)?; .context(FileRecordFailure)?;
collected_files.insert(path, Arc::new(metadata));
} }
transaction
.commit(create_checkpoint) let checkpoint_data = (revision_counter == max_revision).then(|| CheckpointData {
files: collected_files.clone(),
});
state = transaction
.commit(checkpoint_data)
.await .await
.context(CommitFailure)?; .context(CommitFailure)?;
} else { } else {
// we do not have any files for this transaction (there might have been other actions though or it was // we do not have any files for this transaction (there might have been other actions though or it was
// an empty transaction) => create new empty transaction // an empty transaction) => create new empty transaction
let transaction = catalog.open_transaction().await; // Note that this can never be the last transaction, so we don't need to create a checkpoint here.
transaction let transaction = catalog.open_transaction(state).await;
.commit(create_checkpoint) state = transaction.commit(None).await.context(CommitFailure)?;
.await
.context(CommitFailure)?;
} }
} }
} }
Ok(catalog) Ok((catalog, state))
} }
/// Collect all files under the given locations. /// Collect all files under the given locations.
@ -273,7 +279,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with some data // build catalog with some data
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -282,7 +288,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -306,15 +312,15 @@ mod tests {
.await; .await;
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
{ {
// empty transaction // empty transaction
let transaction = catalog.open_transaction().await; let transaction = catalog.open_transaction(state).await;
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -327,12 +333,11 @@ mod tests {
.await; .await;
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
// store catalog state // store catalog state
let paths_expected = { let paths_expected = {
let state = catalog.state();
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort(); tmp.sort();
tmp tmp
@ -344,7 +349,7 @@ mod tests {
// rebuild // rebuild
let path = object_store.new_path(); let path = object_store.new_path();
let catalog = rebuild_catalog::<TestCatalogState, _>( let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
object_store, object_store,
&path, &path,
server_id, server_id,
@ -357,7 +362,6 @@ mod tests {
// check match // check match
let paths_actual = { let paths_actual = {
let state = catalog.state();
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort(); tmp.sort();
tmp tmp
@ -373,7 +377,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build empty catalog // build empty catalog
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, _state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -388,7 +392,7 @@ mod tests {
// rebuild // rebuild
let path = object_store.new_path(); let path = object_store.new_path();
let catalog = rebuild_catalog::<TestCatalogState, _>( let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
object_store, object_store,
&path, &path,
server_id, server_id,
@ -400,7 +404,6 @@ mod tests {
.unwrap(); .unwrap();
// check match // check match
let state = catalog.state();
assert!(state.parquet_files.is_empty()); assert!(state.parquet_files.is_empty());
assert_eq!(catalog.revision_counter(), 0); assert_eq!(catalog.revision_counter(), 0);
} }
@ -451,7 +454,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with same data // build catalog with same data
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -460,7 +463,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -484,7 +487,7 @@ mod tests {
) )
.await; .await;
transaction.commit(false).await.unwrap(); transaction.commit(None).await.unwrap();
} }
// wipe catalog // wipe catalog
@ -544,7 +547,7 @@ mod tests {
.starts_with("Cannot read IOx metadata from parquet file")); .starts_with("Cannot read IOx metadata from parquet file"));
// rebuild (ignore errors) // rebuild (ignore errors)
let catalog = rebuild_catalog::<TestCatalogState, _>( let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
object_store, object_store,
&path, &path,
server_id, server_id,
@ -554,7 +557,6 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let state = catalog.state();
assert!(state.parquet_files.is_empty()); assert!(state.parquet_files.is_empty());
assert_eq!(catalog.revision_counter(), 0); assert_eq!(catalog.revision_counter(), 0);
} }
@ -566,7 +568,7 @@ mod tests {
let db_name = "db1"; let db_name = "db1";
// build catalog with some data (2 transactions + initial empty one) // build catalog with some data (2 transactions + initial empty one)
let catalog = PreservedCatalog::<TestCatalogState>::new_empty( let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name, db_name,
@ -575,7 +577,7 @@ mod tests {
.await .await
.unwrap(); .unwrap();
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -588,10 +590,10 @@ mod tests {
.await; .await;
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = catalog.open_transaction(state).await;
let (path, md) = create_parquet_file( let (path, md) = create_parquet_file(
&object_store, &object_store,
@ -604,13 +606,12 @@ mod tests {
.await; .await;
transaction.add_parquet(&path, &md).unwrap(); transaction.add_parquet(&path, &md).unwrap();
transaction.commit(false).await.unwrap(); state = transaction.commit(None).await.unwrap();
} }
assert_eq!(catalog.revision_counter(), 2); assert_eq!(catalog.revision_counter(), 2);
// store catalog state // store catalog state
let paths_expected = { let paths_expected = {
let state = catalog.state();
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort(); tmp.sort();
tmp tmp
@ -656,7 +657,7 @@ mod tests {
assert!(deleted); assert!(deleted);
// load catalog // load catalog
let catalog = PreservedCatalog::<TestCatalogState>::load( let (catalog, state) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
db_name.to_string(), db_name.to_string(),
@ -668,7 +669,6 @@ mod tests {
// check match // check match
let paths_actual = { let paths_actual = {
let state = catalog.state();
let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect(); let mut tmp: Vec<_> = state.parquet_files.keys().cloned().collect();
tmp.sort(); tmp.sort();
tmp tmp

View File

@ -238,6 +238,7 @@ impl Config {
object_store: Arc<ObjectStore>, object_store: Arc<ObjectStore>,
exec: Arc<Executor>, exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>, preserved_catalog: PreservedCatalog<Catalog>,
catalog: Arc<Catalog>,
) { ) {
let mut state = self.state.write().expect("mutex poisoned"); let mut state = self.state.write().expect("mutex poisoned");
let name = rules.name.clone(); let name = rules.name.clone();
@ -263,6 +264,7 @@ impl Config {
exec, exec,
Arc::clone(&self.jobs), Arc::clone(&self.jobs),
preserved_catalog, preserved_catalog,
catalog,
write_buffer, write_buffer,
)); ));
@ -451,6 +453,7 @@ impl<'a> CreateDatabaseHandle<'a> {
object_store: Arc<ObjectStore>, object_store: Arc<ObjectStore>,
exec: Arc<Executor>, exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>, preserved_catalog: PreservedCatalog<Catalog>,
catalog: Arc<Catalog>,
rules: DatabaseRules, rules: DatabaseRules,
) -> Result<()> { ) -> Result<()> {
let db_name = self.db_name.take().expect("not committed"); let db_name = self.db_name.take().expect("not committed");
@ -462,8 +465,14 @@ impl<'a> CreateDatabaseHandle<'a> {
}); });
} }
self.config self.config.commit_db(
.commit_db(rules, server_id, object_store, exec, preserved_catalog); rules,
server_id,
object_store,
exec,
preserved_catalog,
catalog,
);
Ok(()) Ok(())
} }
@ -531,6 +540,7 @@ impl<'a> RecoverDatabaseHandle<'a> {
object_store: Arc<ObjectStore>, object_store: Arc<ObjectStore>,
exec: Arc<Executor>, exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>, preserved_catalog: PreservedCatalog<Catalog>,
catalog: Arc<Catalog>,
rules: Option<DatabaseRules>, rules: Option<DatabaseRules>,
) -> Result<()> { ) -> Result<()> {
let db_name = self.db_name.take().expect("not committed"); let db_name = self.db_name.take().expect("not committed");
@ -547,8 +557,14 @@ impl<'a> RecoverDatabaseHandle<'a> {
}); });
} }
self.config self.config.commit_db(
.commit_db(rules, server_id, object_store, exec, preserved_catalog); rules,
server_id,
object_store,
exec,
preserved_catalog,
catalog,
);
Ok(()) Ok(())
} }
@ -623,7 +639,7 @@ mod test {
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let exec = Arc::new(Executor::new(1)); let exec = Arc::new(Executor::new(1));
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name, &name,
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -642,13 +658,14 @@ mod test {
Arc::clone(&store), Arc::clone(&store),
Arc::clone(&exec), Arc::clone(&exec),
preserved_catalog, preserved_catalog,
catalog,
rules.clone(), rules.clone(),
) )
.unwrap_err(); .unwrap_err();
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
} }
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name, &name,
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -659,7 +676,7 @@ mod test {
.unwrap(); .unwrap();
let db_reservation = config.create_db(name.clone()).unwrap(); let db_reservation = config.create_db(name.clone()).unwrap();
db_reservation db_reservation
.commit_db(server_id, store, exec, preserved_catalog, rules) .commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.unwrap(); .unwrap();
assert!(config.db(&name).is_some()); assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]); assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -733,7 +750,7 @@ mod test {
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let exec = Arc::new(Executor::new(1)); let exec = Arc::new(Executor::new(1));
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name, &name,
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -750,13 +767,14 @@ mod test {
Arc::clone(&store), Arc::clone(&store),
Arc::clone(&exec), Arc::clone(&exec),
preserved_catalog, preserved_catalog,
catalog,
Some(DatabaseRules::new(DatabaseName::new("bar").unwrap())), Some(DatabaseRules::new(DatabaseName::new("bar").unwrap())),
) )
.unwrap_err(); .unwrap_err();
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
} }
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name, &name,
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -768,7 +786,7 @@ mod test {
let db_reservation = config.recover_db(name.clone()).unwrap(); let db_reservation = config.recover_db(name.clone()).unwrap();
assert!(db_reservation.has_rules()); assert!(db_reservation.has_rules());
db_reservation db_reservation
.commit_db(server_id, store, exec, preserved_catalog, None) .commit_db(server_id, store, exec, preserved_catalog, catalog, None)
.unwrap(); .unwrap();
assert!(config.db(&name).is_some()); assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]); assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -829,7 +847,7 @@ mod test {
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let store = Arc::new(ObjectStore::new_in_memory(InMemory::new()));
let exec = Arc::new(Executor::new(1)); let exec = Arc::new(Executor::new(1));
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&name, &name,
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
@ -839,7 +857,7 @@ mod test {
.await .await
.unwrap(); .unwrap();
db_reservation db_reservation
.commit_db(server_id, store, exec, preserved_catalog, rules) .commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.unwrap(); .unwrap();
let token = config let token = config

View File

@ -30,8 +30,7 @@ use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use object_store::{path::parsed::DirsAndFileName, ObjectStore};
use observability_deps::tracing::{debug, error, info, warn}; use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::RwLock; use parking_lot::RwLock;
use parquet_file::catalog::TransactionEnd; use parquet_file::catalog::{CheckpointData, TransactionEnd};
use parquet_file::metadata::IoxParquetMetaData;
use parquet_file::{ use parquet_file::{
catalog::{ catalog::{
wipe as wipe_preserved_catalog, CatalogParquetInfo, CatalogState, ChunkCreationFailed, wipe as wipe_preserved_catalog, CatalogParquetInfo, CatalogState, ChunkCreationFailed,
@ -221,6 +220,9 @@ pub struct Db {
/// Executor for running queries /// Executor for running queries
exec: Arc<Executor>, exec: Arc<Executor>,
/// Preserved catalog (data in object store).
preserved_catalog: Arc<PreservedCatalog<Catalog>>,
/// The catalog holds chunks of data under partitions for the database. /// The catalog holds chunks of data under partitions for the database.
/// The underlying chunks may be backed by different execution engines /// The underlying chunks may be backed by different execution engines
/// depending on their stage in the data lifecycle. Currently there are /// depending on their stage in the data lifecycle. Currently there are
@ -230,7 +232,7 @@ pub struct Db {
/// - The Read Buffer where chunks are immutable and stored in an optimised /// - The Read Buffer where chunks are immutable and stored in an optimised
/// compressed form for small footprint and fast query execution; and /// compressed form for small footprint and fast query execution; and
/// - The Parquet Buffer where chunks are backed by Parquet file data. /// - The Parquet Buffer where chunks are backed by Parquet file data.
preserved_catalog: Arc<PreservedCatalog<Catalog>>, catalog: Arc<Catalog>,
/// A handle to the global jobs registry for long running tasks /// A handle to the global jobs registry for long running tasks
jobs: Arc<JobRegistry>, jobs: Arc<JobRegistry>,
@ -270,7 +272,7 @@ pub async fn load_or_create_preserved_catalog(
server_id: ServerId, server_id: ServerId,
metrics_registry: Arc<MetricRegistry>, metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool, wipe_on_error: bool,
) -> std::result::Result<PreservedCatalog<Catalog>, parquet_file::catalog::Error> { ) -> std::result::Result<(PreservedCatalog<Catalog>, Arc<Catalog>), parquet_file::catalog::Error> {
let metric_labels = vec![ let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()), KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)), KeyValue::new("svr_id", format!("{}", server_id)),
@ -347,6 +349,7 @@ pub async fn load_or_create_preserved_catalog(
} }
impl Db { impl Db {
#[allow(clippy::clippy::too_many_arguments)]
pub fn new( pub fn new(
rules: DatabaseRules, rules: DatabaseRules,
server_id: ServerId, server_id: ServerId,
@ -354,6 +357,7 @@ impl Db {
exec: Arc<Executor>, exec: Arc<Executor>,
jobs: Arc<JobRegistry>, jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog<Catalog>, preserved_catalog: PreservedCatalog<Catalog>,
catalog: Arc<Catalog>,
write_buffer: Option<Arc<dyn WriteBuffer>>, write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Self { ) -> Self {
let db_name = rules.name.clone(); let db_name = rules.name.clone();
@ -361,12 +365,12 @@ impl Db {
let rules = RwLock::new(rules); let rules = RwLock::new(rules);
let server_id = server_id; let server_id = server_id;
let store = Arc::clone(&object_store); let store = Arc::clone(&object_store);
let metrics_registry = Arc::clone(&preserved_catalog.state().metrics_registry); let metrics_registry = Arc::clone(&catalog.metrics_registry);
let metric_labels = preserved_catalog.state().metric_labels.clone(); let metric_labels = catalog.metric_labels.clone();
let catalog_access = QueryCatalogAccess::new( let catalog_access = QueryCatalogAccess::new(
&db_name, &db_name,
preserved_catalog.state(), Arc::clone(&catalog),
Arc::clone(&jobs), Arc::clone(&jobs),
Arc::clone(&metrics_registry), Arc::clone(&metrics_registry),
metric_labels.clone(), metric_labels.clone(),
@ -381,6 +385,7 @@ impl Db {
store, store,
exec, exec,
preserved_catalog: Arc::new(preserved_catalog), preserved_catalog: Arc::new(preserved_catalog),
catalog,
jobs, jobs,
metrics_registry, metrics_registry,
catalog_access, catalog_access,
@ -424,10 +429,7 @@ impl Db {
table_name: &str, table_name: &str,
partition_key: &str, partition_key: &str,
) -> catalog::Result<Arc<tracker::RwLock<Partition>>> { ) -> catalog::Result<Arc<tracker::RwLock<Partition>>> {
let partition = self let partition = self.catalog.partition(table_name, partition_key)?;
.preserved_catalog
.state()
.partition(table_name, partition_key)?;
Ok(Arc::clone(&partition)) Ok(Arc::clone(&partition))
} }
@ -437,9 +439,7 @@ impl Db {
partition_key: &str, partition_key: &str,
chunk_id: u32, chunk_id: u32,
) -> catalog::Result<Arc<tracker::RwLock<CatalogChunk>>> { ) -> catalog::Result<Arc<tracker::RwLock<CatalogChunk>>> {
self.preserved_catalog self.catalog.chunk(table_name, partition_key, chunk_id)
.state()
.chunk(table_name, partition_key, chunk_id)
} }
pub fn lockable_chunk( pub fn lockable_chunk(
@ -554,14 +554,7 @@ impl Db {
let mut rb_chunk = RBChunk::new( let mut rb_chunk = RBChunk::new(
&table_summary.name, &table_summary.name,
ReadBufferChunkMetrics::new( ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
&metrics,
db.preserved_catalog
.state()
.metrics()
.memory()
.read_buffer(),
),
); );
let fut = async move { let fut = async move {
@ -654,7 +647,8 @@ impl Db {
.lifecycle_rules .lifecycle_rules
.catalog_transactions_until_checkpoint; .catalog_transactions_until_checkpoint;
let catalog = Arc::clone(&db.preserved_catalog); let preserved_catalog = Arc::clone(&db.preserved_catalog);
let catalog = Arc::clone(&db.catalog);
let fut = async move { let fut = async move {
let table_name = table_summary.name.as_str(); let table_name = table_summary.name.as_str();
@ -676,7 +670,7 @@ impl Db {
// catalog-level transaction for preservation layer // catalog-level transaction for preservation layer
{ {
let mut transaction = catalog.open_transaction().await; let mut transaction = preserved_catalog.open_transaction(catalog).await;
// Write this table data into the object store // Write this table data into the object store
// //
@ -701,18 +695,28 @@ impl Db {
) )
.await .await
.context(WritingToObjectStore)?; .context(WritingToObjectStore)?;
let path: DirsAndFileName = path.into();
transaction transaction
.add_parquet(&path.into(), &parquet_metadata) .add_parquet(&path, &parquet_metadata)
.context(TransactionError)?; .context(TransactionError)?;
let create_checkpoint = catalog_transactions_until_checkpoint let create_checkpoint = catalog_transactions_until_checkpoint
.map_or(false, |interval| { .map_or(false, |interval| {
transaction.revision_counter() % interval.get() == 0 transaction.revision_counter() % interval.get() == 0
}); });
let checkpoint_data = create_checkpoint.then(|| {
let mut checkpoint_data =
checkpoint_data_from_catalog(&transaction.tstate().catalog);
// don't forget the file that we've just added
checkpoint_data
.files
.insert(path, Arc::new(parquet_metadata));
checkpoint_data
});
transaction transaction
.commit(create_checkpoint) .commit(checkpoint_data)
.await .await
.context(TransactionError)?; .context(TransactionError)?;
} }
@ -756,11 +760,8 @@ impl Db {
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> { pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
let partition_key = Some(partition_key); let partition_key = Some(partition_key);
let table_names = TableNameFilter::AllTables; let table_names = TableNameFilter::AllTables;
self.preserved_catalog.state().filtered_chunks( self.catalog
table_names, .filtered_chunks(table_names, partition_key, CatalogChunk::summary)
partition_key,
CatalogChunk::summary,
)
} }
/// Return Summary information for all columns in all chunks in the /// Return Summary information for all columns in all chunks in the
@ -770,8 +771,7 @@ impl Db {
table_name: &str, table_name: &str,
partition_key: &str, partition_key: &str,
) -> Option<PartitionSummary> { ) -> Option<PartitionSummary> {
self.preserved_catalog self.catalog
.state()
.partition(table_name, partition_key) .partition(table_name, partition_key)
.map(|partition| partition.read().summary()) .map(|partition| partition.read().summary())
.ok() .ok()
@ -840,7 +840,7 @@ impl Db {
debug!(?duration, "cleanup worker sleeps"); debug!(?duration, "cleanup worker sleeps");
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await { if let Err(e) = cleanup_unreferenced_parquet_files(&self.preserved_catalog, Arc::clone(&self.catalog), 1_000).await {
error!(%e, "error in background cleanup task"); error!(%e, "error in background cleanup task");
} }
} => {}, } => {},
@ -918,7 +918,7 @@ impl Db {
return DatabaseNotWriteable {}.fail(); return DatabaseNotWriteable {}.fail();
} }
if let Some(hard_limit) = buffer_size_hard { if let Some(hard_limit) = buffer_size_hard {
if self.preserved_catalog.state().metrics().memory().total() > hard_limit.get() { if self.catalog.metrics().memory().total() > hard_limit.get() {
return HardLimitReached {}.fail(); return HardLimitReached {}.fail();
} }
} }
@ -932,8 +932,7 @@ impl Db {
} }
let partition = self let partition = self
.preserved_catalog .catalog
.state()
.get_or_create_partition(table_batch.name(), partition_key); .get_or_create_partition(table_batch.name(), partition_key);
let mut partition = partition.write(); let mut partition = partition.write();
@ -970,11 +969,7 @@ impl Db {
table_batch.name(), table_batch.name(),
MutableBufferChunkMetrics::new( MutableBufferChunkMetrics::new(
&metrics, &metrics,
self.preserved_catalog self.catalog.metrics().memory().mutable_buffer(),
.state()
.metrics()
.memory()
.mutable_buffer(),
), ),
); );
@ -1349,11 +1344,12 @@ impl CatalogState for Catalog {
Ok(()) Ok(())
} }
} }
}
fn files(&self) -> HashMap<DirsAndFileName, Arc<IoxParquetMetaData>> { fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
let mut files = HashMap::new(); let mut files = HashMap::new();
for chunk in self.chunks() { for chunk in catalog.chunks() {
let guard = chunk.read(); let guard = chunk.read();
if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() { if let catalog::chunk::ChunkStage::Persisted { parquet, .. } = guard.stage() {
let path: DirsAndFileName = parquet.path().into(); let path: DirsAndFileName = parquet.path().into();
@ -1361,8 +1357,7 @@ impl CatalogState for Catalog {
} }
} }
files CheckpointData { files }
}
} }
pub mod test_helpers { pub mod test_helpers {
@ -1430,6 +1425,7 @@ mod tests {
}; };
use parquet_file::{ use parquet_file::{
catalog::test_helpers::assert_catalog_state_implementation, catalog::test_helpers::assert_catalog_state_implementation,
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
}; };
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
@ -2175,11 +2171,7 @@ mod tests {
let after_write = Utc::now(); let after_write = Utc::now();
let last_write_prev = { let last_write_prev = {
let partition = db let partition = db.catalog.partition("cpu", partition_key).unwrap();
.preserved_catalog
.state()
.partition("cpu", partition_key)
.unwrap();
let partition = partition.read(); let partition = partition.read();
assert_ne!(partition.created_at(), partition.last_write_at()); assert_ne!(partition.created_at(), partition.last_write_at());
@ -2190,11 +2182,7 @@ mod tests {
write_lp(&db, "cpu bar=1 20"); write_lp(&db, "cpu bar=1 20");
{ {
let partition = db let partition = db.catalog.partition("cpu", partition_key).unwrap();
.preserved_catalog
.state()
.partition("cpu", partition_key)
.unwrap();
let partition = partition.read(); let partition = partition.read();
assert!(last_write_prev < partition.last_write_at()); assert!(last_write_prev < partition.last_write_at());
} }
@ -2219,11 +2207,7 @@ mod tests {
.id(); .id();
let after_rollover = Utc::now(); let after_rollover = Utc::now();
let partition = db let partition = db.catalog.partition("cpu", partition_key).unwrap();
.preserved_catalog
.state()
.partition("cpu", partition_key)
.unwrap();
let partition = partition.read(); let partition = partition.read();
let chunk = partition.chunk(chunk_id).unwrap(); let chunk = partition.chunk(chunk_id).unwrap();
let chunk = chunk.read(); let chunk = chunk.read();
@ -2251,15 +2235,11 @@ mod tests {
write_lp(&db, "cpu bar=1 10"); write_lp(&db, "cpu bar=1 10");
write_lp(&db, "cpu bar=1 20"); write_lp(&db, "cpu bar=1 20");
let partitions = db.preserved_catalog.state().partition_keys(); let partitions = db.catalog.partition_keys();
assert_eq!(partitions.len(), 1); assert_eq!(partitions.len(), 1);
let partition_key = partitions.into_iter().next().unwrap(); let partition_key = partitions.into_iter().next().unwrap();
let partition = db let partition = db.catalog.partition("cpu", &partition_key).unwrap();
.preserved_catalog
.state()
.partition("cpu", &partition_key)
.unwrap();
let partition = partition.read(); let partition = partition.read();
let chunks: Vec<_> = partition.chunks().collect(); let chunks: Vec<_> = partition.chunks().collect();
@ -2292,7 +2272,7 @@ mod tests {
order: Order::Desc, order: Order::Desc,
sort: Sort::LastWriteTime, sort: Sort::LastWriteTime,
}; };
let chunks = db.preserved_catalog.state().chunks_sorted_by(&sort_rules); let chunks = db.catalog.chunks_sorted_by(&sort_rules);
let partitions: Vec<_> = chunks let partitions: Vec<_> = chunks
.into_iter() .into_iter()
.map(|x| x.read().key().to_string()) .map(|x| x.read().key().to_string())
@ -2304,7 +2284,7 @@ mod tests {
order: Order::Asc, order: Order::Asc,
sort: Sort::CreatedAtTime, sort: Sort::CreatedAtTime,
}; };
let chunks = db.preserved_catalog.state().chunks_sorted_by(&sort_rules); let chunks = db.catalog.chunks_sorted_by(&sort_rules);
let partitions: Vec<_> = chunks let partitions: Vec<_> = chunks
.into_iter() .into_iter()
.map(|x| x.read().key().to_string()) .map(|x| x.read().key().to_string())
@ -2414,12 +2394,7 @@ mod tests {
.sum(); .sum();
assert_eq!( assert_eq!(
db.preserved_catalog db.catalog.metrics().memory().mutable_buffer().get_total(),
.state()
.metrics()
.memory()
.mutable_buffer()
.get_total(),
size size
); );
@ -2552,32 +2527,14 @@ mod tests {
); );
assert_eq!( assert_eq!(
db.preserved_catalog db.catalog.metrics().memory().mutable_buffer().get_total(),
.state()
.metrics()
.memory()
.mutable_buffer()
.get_total(),
64 + 2190 + 87 64 + 2190 + 87
); );
assert_eq!( assert_eq!(
db.preserved_catalog db.catalog.metrics().memory().read_buffer().get_total(),
.state()
.metrics()
.memory()
.read_buffer()
.get_total(),
1484 1484
); );
assert_eq!( assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 663);
db.preserved_catalog
.state()
.metrics()
.memory()
.parquet()
.get_total(),
663
);
} }
#[tokio::test] #[tokio::test]
@ -2863,7 +2820,7 @@ mod tests {
.eq(0.) .eq(0.)
.unwrap(); .unwrap();
let chunks = db.preserved_catalog.state().chunks(); let chunks = db.catalog.chunks();
assert_eq!(chunks.len(), 1); assert_eq!(chunks.len(), 1);
let chunk_a = Arc::clone(&chunks[0]); let chunk_a = Arc::clone(&chunks[0]);
@ -2958,7 +2915,7 @@ mod tests {
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let db_name = "preserved_catalog_test"; let db_name = "preserved_catalog_test";
let preserved_catalog = let (preserved_catalog, _catalog) =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::new_empty( PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::new_empty(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
@ -3029,7 +2986,7 @@ mod tests {
} }
} }
paths_expected.sort(); paths_expected.sort();
let preserved_catalog = let (_preserved_catalog, catalog) =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load( PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
@ -3040,8 +2997,7 @@ mod tests {
.unwrap() .unwrap()
.unwrap(); .unwrap();
let paths_actual = { let paths_actual = {
let state = preserved_catalog.state(); let mut tmp: Vec<String> = catalog.parquet_files.keys().map(|p| p.display()).collect();
let mut tmp: Vec<String> = state.parquet_files.keys().map(|p| p.display()).collect();
tmp.sort(); tmp.sort();
tmp tmp
}; };
@ -3251,7 +3207,11 @@ mod tests {
metrics_registry, metrics_registry,
metric_labels: vec![], metric_labels: vec![],
}; };
assert_catalog_state_implementation::<Catalog>(empty_input).await; assert_catalog_state_implementation::<Catalog, _>(
empty_input,
checkpoint_data_from_catalog,
)
.await;
} }
async fn create_parquet_chunk(db: &Db) -> (String, String, u32) { async fn create_parquet_chunk(db: &Db) -> (String, String, u32) {

View File

@ -77,7 +77,7 @@ impl<'a> LifecycleDb for &'a Db {
type Chunk = LockableCatalogChunk<'a>; type Chunk = LockableCatalogChunk<'a>;
fn buffer_size(self) -> usize { fn buffer_size(self) -> usize {
self.preserved_catalog.state().metrics().memory().total() self.catalog.metrics().memory().total()
} }
fn rules(self) -> LifecycleRules { fn rules(self) -> LifecycleRules {
@ -85,8 +85,7 @@ impl<'a> LifecycleDb for &'a Db {
} }
fn chunks(self, sort_order: &SortOrder) -> Vec<Self::Chunk> { fn chunks(self, sort_order: &SortOrder) -> Vec<Self::Chunk> {
self.preserved_catalog self.catalog
.state()
.chunks_sorted_by(sort_order) .chunks_sorted_by(sort_order)
.into_iter() .into_iter()
.map(|chunk| LockableCatalogChunk { db: self, chunk }) .map(|chunk| LockableCatalogChunk { db: self, chunk })

View File

@ -325,10 +325,10 @@ impl InitStatus {
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(CatalogLoadError) .context(CatalogLoadError)
{ {
Ok(preserved_catalog) => { Ok((preserved_catalog, catalog)) => {
// everything is there, can create DB // everything is there, can create DB
handle handle
.commit_db(server_id, store, exec, preserved_catalog, rules) .commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
.map_err(Box::new) .map_err(Box::new)
.context(CreateDbError)?; .context(CreateDbError)?;
Ok(()) Ok(())
@ -418,9 +418,9 @@ impl InitStatus {
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(CatalogLoadError) .context(CatalogLoadError)
{ {
Ok(preserved_catalog) => { Ok((preserved_catalog, catalog)) => {
handle handle
.commit_db(server_id, store, exec, preserved_catalog, None) .commit_db(server_id, store, exec, preserved_catalog, catalog, None)
.map_err(|e | { .map_err(|e | {
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover"); warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Box::new(e) Box::new(e)

View File

@ -487,7 +487,7 @@ where
let db_reservation = self.config.create_db(rules.name.clone())?; let db_reservation = self.config.create_db(rules.name.clone())?;
self.persist_database_rules(rules.clone()).await?; self.persist_database_rules(rules.clone()).await?;
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
rules.db_name(), rules.db_name(),
Arc::clone(&self.store), Arc::clone(&self.store),
server_id, server_id,
@ -503,6 +503,7 @@ where
Arc::clone(&self.store), Arc::clone(&self.store),
Arc::clone(&self.exec), Arc::clone(&self.exec),
preserved_catalog, preserved_catalog,
catalog,
rules, rules,
)?; )?;
@ -1962,7 +1963,7 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
let preserved_catalog = PreservedCatalog::<TestCatalogState>::load( let (preserved_catalog, _catalog) = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&store), Arc::clone(&store),
server_id, server_id,
db_name_catalog_broken.to_string(), db_name_catalog_broken.to_string(),

View File

@ -57,7 +57,7 @@ impl TestDbBuilder {
let exec = Arc::new(Executor::new(1)); let exec = Arc::new(Executor::new(1));
let metrics_registry = Arc::new(metrics::MetricRegistry::new()); let metrics_registry = Arc::new(metrics::MetricRegistry::new());
let preserved_catalog = load_or_create_preserved_catalog( let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
db_name.as_str(), db_name.as_str(),
Arc::clone(&object_store), Arc::clone(&object_store),
server_id, server_id,
@ -87,6 +87,7 @@ impl TestDbBuilder {
exec, exec,
Arc::new(JobRegistry::new()), Arc::new(JobRegistry::new()),
preserved_catalog, preserved_catalog,
catalog,
self.write_buffer, self.write_buffer,
), ),
} }