Merge pull request #1772 from influxdata/crepererum/issue1740-b

refactor: decouple preserved and in-mem catalog (part 2)
pull/24376/head
kodiakhq[bot] 2021-06-22 13:31:37 +00:00 committed by GitHub
commit 8b09a02136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 270 deletions

View File

@ -6,7 +6,6 @@ use std::{
},
convert::TryInto,
fmt::{Debug, Display},
marker::PhantomData,
num::TryFromIntError,
str::FromStr,
sync::Arc,
@ -257,40 +256,8 @@ pub trait CatalogState {
fn remove(tstate: &mut Self::TransactionState, path: DirsAndFileName) -> Result<()>;
}
/// Find last transaction-start-timestamp.
///
/// This method is designed to read and verify as little as possible and should also work on most broken catalogs.
pub async fn find_last_transaction_timestamp(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
match load_transaction_proto(object_store, &path).await {
Ok(proto) => match parse_timestamp(&proto.start_timestamp) {
Ok(ts) => {
res = Some(res.map_or(ts, |res: DateTime<Utc>| res.max(ts)));
}
Err(e) => warn!(%e, ?path, "Cannot parse timestamp"),
},
Err(e @ Error::Read { .. }) => {
// bubble up IO error
return Err(e);
}
Err(e) => warn!(%e, ?path, "Cannot read transaction"),
}
}
Ok(res)
}
/// In-memory view of the preserved catalog.
pub struct PreservedCatalog<S>
where
S: CatalogState,
{
pub struct PreservedCatalog {
// We need an RWLock AND a semaphore, so that readers are NOT blocked during an open transactions. Note that this
// requires a new transaction to:
//
@ -315,32 +282,9 @@ where
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: String,
// temporary measure to keep the API a bit more stable
phantom: PhantomData<S>,
}
/// Deletes catalog.
///
/// **Always create a backup before wiping your data!**
///
/// This also works for broken catalogs. Also succeeds if no catalog is present.
///
/// Note that wiping the catalog will NOT wipe any referenced parquet files.
pub async fn wipe(object_store: &ObjectStore, server_id: ServerId, db_name: &str) -> Result<()> {
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
object_store.delete(&path).await.context(Write)?;
}
Ok(())
}
impl<S> PreservedCatalog<S>
where
S: CatalogState + Send + Sync,
{
impl PreservedCatalog {
/// Checks if a preserved catalog exists.
pub async fn exists(
object_store: &ObjectStore,
@ -352,18 +296,69 @@ where
.is_empty())
}
/// Find last transaction-start-timestamp.
///
/// This method is designed to read and verify as little as possible and should also work on most broken catalogs.
pub async fn find_last_transaction_timestamp(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
match load_transaction_proto(object_store, &path).await {
Ok(proto) => match parse_timestamp(&proto.start_timestamp) {
Ok(ts) => {
res = Some(res.map_or(ts, |res: DateTime<Utc>| res.max(ts)));
}
Err(e) => warn!(%e, ?path, "Cannot parse timestamp"),
},
Err(e @ Error::Read { .. }) => {
// bubble up IO error
return Err(e);
}
Err(e) => warn!(%e, ?path, "Cannot read transaction"),
}
}
Ok(res)
}
/// Deletes catalog.
///
/// **Always create a backup before wiping your data!**
///
/// This also works for broken catalogs. Also succeeds if no catalog is present.
///
/// Note that wiping the catalog will NOT wipe any referenced parquet files.
pub async fn wipe(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Result<()> {
for (path, _file_type, _revision_counter, _uuid) in
list_files(object_store, server_id, db_name).await?
{
object_store.delete(&path).await.context(Write)?;
}
Ok(())
}
/// Create new catalog w/o any data.
///
/// An empty transaction will be used to mark the catalog start so that concurrent open but still-empty catalogs can
/// easily be detected.
pub async fn new_empty(
pub async fn new_empty<S>(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: impl Into<String> + Send,
db_name: String,
state_data: S::EmptyInput,
) -> Result<(Self, Arc<S>)> {
let db_name = db_name.into();
) -> Result<(Self, Arc<S>)>
where
S: CatalogState + Send + Sync,
{
if Self::exists(&object_store, server_id, &db_name).await? {
return Err(Error::AlreadyExists {});
}
@ -375,7 +370,6 @@ where
object_store,
server_id,
db_name,
phantom: PhantomData,
};
// add empty transaction
@ -389,12 +383,15 @@ where
///
/// Loading starts at the latest checkpoint or -- if none exists -- at transaction `0`. Transactions before that
/// point are neither verified nor are they required to exist.
pub async fn load(
pub async fn load<S>(
object_store: Arc<ObjectStore>,
server_id: ServerId,
db_name: String,
state_data: S::EmptyInput,
) -> Result<Option<(Self, Arc<S>)>> {
) -> Result<Option<(Self, Arc<S>)>>
where
S: CatalogState + Send + Sync,
{
// parse all paths into revisions
let mut transactions: HashMap<u64, Uuid> = HashMap::new();
let mut max_revision = None;
@ -492,7 +489,6 @@ where
object_store,
server_id,
db_name,
phantom: PhantomData,
},
state,
)))
@ -504,17 +500,23 @@ where
/// transaction handle is dropped. The newly created transaction will contain the state after `await` (esp.
/// post-blocking). This system is fair, which means that transactions are given out in the order they were
/// requested.
pub async fn open_transaction(&self, state: Arc<S>) -> TransactionHandle<'_, S> {
pub async fn open_transaction<S>(&self, state: Arc<S>) -> TransactionHandle<'_, S>
where
S: CatalogState + Send + Sync,
{
self.open_transaction_with_uuid(Uuid::new_v4(), state).await
}
/// Crate-private API to open an transaction with a specified UUID. Should only be used for catalog rebuilding or
/// with a fresh V4-UUID!
pub(crate) async fn open_transaction_with_uuid(
pub(crate) async fn open_transaction_with_uuid<S>(
&self,
uuid: Uuid,
state: Arc<S>,
) -> TransactionHandle<'_, S> {
) -> TransactionHandle<'_, S>
where
S: CatalogState + Send + Sync,
{
TransactionHandle::new(self, uuid, state).await
}
@ -543,10 +545,7 @@ where
}
}
impl<S> Debug for PreservedCatalog<S>
where
S: CatalogState,
{
impl Debug for PreservedCatalog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PreservedCatalog{{..}}")
}
@ -1032,7 +1031,7 @@ pub struct TransactionHandle<'c, S>
where
S: CatalogState + Send + Sync,
{
catalog: &'c PreservedCatalog<S>,
catalog: &'c PreservedCatalog,
// NOTE: The permit is technically used since we use it to reference the semaphore. It implements `drop` which we
// rely on.
@ -1047,7 +1046,7 @@ where
S: CatalogState + Send + Sync,
{
async fn new(
catalog: &'c PreservedCatalog<S>,
catalog: &'c PreservedCatalog,
uuid: Uuid,
state: Arc<S>,
) -> TransactionHandle<'c, S> {
@ -1373,10 +1372,7 @@ pub mod test_helpers {
}
/// Break preserved catalog by moving one of the transaction files into a weird unknown version.
pub async fn break_catalog_with_weird_version<S>(catalog: &PreservedCatalog<S>)
where
S: CatalogState + Send + Sync,
{
pub async fn break_catalog_with_weird_version(catalog: &PreservedCatalog) {
let tkey = get_tkey(catalog);
let path = file_path(
&catalog.object_store,
@ -1395,10 +1391,7 @@ pub mod test_helpers {
}
/// Helper function to ensure that guards don't leak into the future state machine.
fn get_tkey<S>(catalog: &PreservedCatalog<S>) -> TransactionKey
where
S: CatalogState + Send + Sync,
{
fn get_tkey(catalog: &PreservedCatalog) -> TransactionKey {
let guard = catalog.previous_tkey.read();
guard
.as_ref()
@ -1416,7 +1409,7 @@ pub mod test_helpers {
{
// empty state
let object_store = make_object_store();
let (catalog, mut state) = PreservedCatalog::<S>::new_empty(
let (catalog, mut state) = PreservedCatalog::new_empty::<S>(
Arc::clone(&object_store),
ServerId::try_from(1).unwrap(),
"db1".to_string(),
@ -1686,11 +1679,11 @@ mod tests {
let db_name = "db1";
assert!(
!PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
assert!(PreservedCatalog::<TestCatalogState>::load(
assert!(PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1700,7 +1693,7 @@ mod tests {
.unwrap()
.is_none());
PreservedCatalog::<TestCatalogState>::new_empty(
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1709,12 +1702,10 @@ mod tests {
.await
.unwrap();
assert!(
PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
assert!(PreservedCatalog::<TestCatalogState>::load(
assert!(PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap());
assert!(PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1740,7 +1731,7 @@ mod tests {
#[tokio::test]
async fn test_load_from_empty_store() {
let object_store = make_object_store();
let option = PreservedCatalog::<TestCatalogState>::load(
let option = PreservedCatalog::load::<TestCatalogState>(
object_store,
make_server_id(),
"db1".to_string(),
@ -1794,7 +1785,7 @@ mod tests {
create_empty_file(&object_store, &path).await;
// no data present
let option = PreservedCatalog::<TestCatalogState>::load(
let option = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.clone(),
@ -1828,7 +1819,7 @@ mod tests {
checked_delete(&object_store, &path).await;
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1846,7 +1837,7 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
let (catalog, _state) = PreservedCatalog::<TestCatalogState>::load(
let (catalog, _state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1858,7 +1849,7 @@ mod tests {
break_catalog_with_weird_version(&catalog).await;
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1895,7 +1886,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1934,7 +1925,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -1974,7 +1965,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2011,7 +2002,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2048,7 +2039,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2082,7 +2073,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2116,7 +2107,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2158,7 +2149,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2174,7 +2165,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_debug() {
let object_store = make_object_store();
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
object_store,
make_server_id(),
"db1".to_string(),
@ -2231,7 +2222,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2282,7 +2273,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2327,7 +2318,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2364,7 +2355,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2404,7 +2395,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2441,7 +2432,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2478,7 +2469,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2515,7 +2506,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2558,7 +2549,7 @@ mod tests {
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
let res = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2583,7 +2574,7 @@ mod tests {
let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// re-open catalog
let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::load(
let (catalog, mut state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2746,12 +2737,7 @@ mod tests {
}
}
fn record(
&mut self,
catalog: &PreservedCatalog<TestCatalogState>,
state: &TestCatalogState,
aborted: bool,
) {
fn record(&mut self, catalog: &PreservedCatalog, state: &TestCatalogState, aborted: bool) {
self.tkeys
.push(catalog.previous_tkey.read().clone().unwrap());
self.states.push(state.clone());
@ -2872,7 +2858,7 @@ mod tests {
let server_id = make_server_id();
let db_name = "db1";
PreservedCatalog::<TestCatalogState>::new_empty(
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2881,7 +2867,7 @@ mod tests {
.await
.unwrap();
let res = PreservedCatalog::<TestCatalogState>::new_empty(
let res = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2897,7 +2883,9 @@ mod tests {
let server_id = make_server_id();
let db_name = "db1";
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
}
#[tokio::test]
@ -2910,15 +2898,17 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await;
// wipe
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// `exists` and `load` both report "no data"
assert!(
!PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
assert!(PreservedCatalog::<TestCatalogState>::load(
assert!(PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2929,7 +2919,7 @@ mod tests {
.is_none());
// can create new catalog
PreservedCatalog::<TestCatalogState>::new_empty(
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2949,7 +2939,7 @@ mod tests {
assert_single_catalog_inmem_works(&object_store, make_server_id(), "db1").await;
// break
let (catalog, _state) = PreservedCatalog::<TestCatalogState>::load(
let (catalog, _state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2961,15 +2951,17 @@ mod tests {
break_catalog_with_weird_version(&catalog).await;
// wipe
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// `exists` and `load` both report "no data"
assert!(
!PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
assert!(PreservedCatalog::<TestCatalogState>::load(
assert!(PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -2980,7 +2972,7 @@ mod tests {
.is_none());
// can create new catalog
PreservedCatalog::<TestCatalogState>::new_empty(
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -3006,7 +2998,9 @@ mod tests {
create_empty_file(&object_store, &path).await;
// wipe
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// check file is still there
let prefix = catalog_path(&object_store, server_id, &db_name);
@ -3025,7 +3019,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_revision_counter() {
let object_store = make_object_store();
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
object_store,
make_server_id(),
"db1".to_string(),
@ -3041,7 +3035,7 @@ mod tests {
#[tokio::test]
async fn test_transaction_handle_uuid() {
let object_store = make_object_store();
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
object_store,
make_server_id(),
"db1".to_string(),
@ -3062,10 +3056,11 @@ mod tests {
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
let ts =
PreservedCatalog::find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
assert!(trace.aborted[trace.aborted.len() - 1]);
@ -3092,12 +3087,14 @@ mod tests {
let server_id = make_server_id();
let db_name = "db1";
assert!(
find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.is_none()
);
assert!(PreservedCatalog::find_last_transaction_timestamp(
&object_store,
server_id,
db_name
)
.await
.unwrap()
.is_none());
}
#[tokio::test]
@ -3123,10 +3120,11 @@ mod tests {
.await
.unwrap();
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
let ts =
PreservedCatalog::find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
assert!(trace.aborted[trace.aborted.len() - 1]);
@ -3175,10 +3173,11 @@ mod tests {
.await
.unwrap();
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
let ts =
PreservedCatalog::find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
assert!(trace.aborted[trace.aborted.len() - 1]);
@ -3206,7 +3205,7 @@ mod tests {
let db_name = "db1";
let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::load(
let (catalog, mut state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -3240,10 +3239,11 @@ mod tests {
}
drop(catalog);
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
let ts =
PreservedCatalog::find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// check timestamps
assert!(!trace.aborted[trace.aborted.len() - 1]);
@ -3295,12 +3295,12 @@ mod tests {
let db_name = "db1";
assert!(
!PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
!PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
@ -3340,12 +3340,10 @@ mod tests {
drop(catalog);
assert!(
PreservedCatalog::<TestCatalogState>::exists(&object_store, server_id, db_name,)
.await
.unwrap()
);
assert!(PreservedCatalog::<TestCatalogState>::load(
assert!(PreservedCatalog::exists(&object_store, server_id, db_name,)
.await
.unwrap());
assert!(PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),

View File

@ -40,7 +40,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held
/// use `max_files` which will limit the number of files to delete in this cleanup round.
pub async fn cleanup_unreferenced_parquet_files<S>(
catalog: &PreservedCatalog<S>,
catalog: &PreservedCatalog,
state: Arc<S>,
max_files: usize,
) -> Result<()>
@ -56,7 +56,7 @@ where
let db_name = catalog.db_name();
let all_known = {
// replay catalog transactions to track ALL (even dropped) files that are referenced
let (_catalog, state) = PreservedCatalog::<TracerCatalogState>::load(
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
Arc::clone(&store),
server_id,
db_name.to_string(),
@ -184,10 +184,10 @@ mod tests {
let server_id = make_server_id();
let db_name = "db1";
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -205,10 +205,10 @@ mod tests {
let server_id = make_server_id();
let db_name = db_name();
let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -266,10 +266,10 @@ mod tests {
let server_id = make_server_id();
let db_name = db_name();
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -306,10 +306,10 @@ mod tests {
let server_id = make_server_id();
let db_name = db_name();
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await

View File

@ -60,7 +60,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Creates a new catalog from parquet files.
///
/// Users are required to [wipe](crate::catalog::wipe) the existing catalog before running this
/// Users are required to [wipe](PreservedCatalog::wipe) the existing catalog before running this
/// procedure (**after creating a backup!**).
///
/// This will create a catalog checkpoint for the very last transaction.
@ -89,17 +89,16 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// - **Multiple Transactions:** If there are multiple transaction with the same revision but different UUIDs, this
/// routine cannot reconstruct a single linear revision history. Make sure to
// [clean up](crate::cleanup::cleanup_unreferenced_parquet_files) regularly to avoid this case.
pub async fn rebuild_catalog<S, N>(
pub async fn rebuild_catalog<S>(
object_store: Arc<ObjectStore>,
search_location: &Path,
server_id: ServerId,
db_name: N,
db_name: String,
catalog_empty_input: S::EmptyInput,
ignore_metadata_read_failure: bool,
) -> Result<(PreservedCatalog<S>, Arc<S>)>
) -> Result<(PreservedCatalog, Arc<S>)>
where
S: CatalogState + Send + Sync,
N: Into<String> + Send,
{
// collect all revisions from parquet files
let mut revisions =
@ -107,7 +106,7 @@ where
// create new empty catalog
let (catalog, mut state) =
PreservedCatalog::<S>::new_empty(object_store, server_id, db_name, catalog_empty_input)
PreservedCatalog::new_empty::<S>(object_store, server_id, db_name, catalog_empty_input)
.await
.context(NewEmptyFailure)?;
@ -267,7 +266,7 @@ mod tests {
use crate::{catalog::test_helpers::TestCatalogState, storage::MemWriter};
use crate::{
catalog::{wipe, PreservedCatalog},
catalog::PreservedCatalog,
storage::Storage,
test_utils::{make_object_store, make_record_batch},
};
@ -279,10 +278,10 @@ mod tests {
let db_name = "db1";
// build catalog with some data
let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -345,15 +344,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -377,10 +378,10 @@ mod tests {
let db_name = "db1";
// build empty catalog
let (catalog, _state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, _state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -388,15 +389,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -415,10 +418,10 @@ mod tests {
let db_name = "db1";
// build catalog with same data
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
let catalog = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -429,15 +432,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState, _>(
let res = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -454,10 +459,10 @@ mod tests {
let db_name = "db1";
// build catalog with same data
let (catalog, state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -492,15 +497,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState, _>(
let res = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -516,10 +523,10 @@ mod tests {
let db_name = "db1";
// build catalog with same data
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
let catalog = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -530,15 +537,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild (do not ignore errors)
let path = object_store.new_path();
let res = rebuild_catalog::<TestCatalogState, _>(
let res = rebuild_catalog::<TestCatalogState>(
Arc::clone(&object_store),
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -547,11 +556,11 @@ mod tests {
.starts_with("Cannot read IOx metadata from parquet file"));
// rebuild (ignore errors)
let (catalog, state) = rebuild_catalog::<TestCatalogState, _>(
let (catalog, state) = rebuild_catalog::<TestCatalogState>(
object_store,
&path,
server_id,
db_name,
db_name.to_string(),
(),
true,
)
@ -568,10 +577,10 @@ mod tests {
let db_name = "db1";
// build catalog with some data (2 transactions + initial empty one)
let (catalog, mut state) = PreservedCatalog::<TestCatalogState>::new_empty(
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name,
db_name.to_string(),
(),
)
.await
@ -619,15 +628,17 @@ mod tests {
// wipe catalog
drop(catalog);
wipe(&object_store, server_id, db_name).await.unwrap();
PreservedCatalog::wipe(&object_store, server_id, db_name)
.await
.unwrap();
// rebuild
let path = object_store.new_path();
let catalog = rebuild_catalog::<TestCatalogState, _>(
let catalog = rebuild_catalog::<TestCatalogState>(
Arc::clone(&object_store),
&path,
server_id,
db_name,
db_name.to_string(),
(),
false,
)
@ -657,7 +668,7 @@ mod tests {
assert!(deleted);
// load catalog
let (catalog, state) = PreservedCatalog::<TestCatalogState>::load(
let (catalog, state) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),

View File

@ -237,7 +237,7 @@ impl Config {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>,
preserved_catalog: PreservedCatalog,
catalog: Arc<Catalog>,
) {
let mut state = self.state.write().expect("mutex poisoned");
@ -452,7 +452,7 @@ impl<'a> CreateDatabaseHandle<'a> {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>,
preserved_catalog: PreservedCatalog,
catalog: Arc<Catalog>,
rules: DatabaseRules,
) -> Result<()> {
@ -539,7 +539,7 @@ impl<'a> RecoverDatabaseHandle<'a> {
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog<Catalog>,
preserved_catalog: PreservedCatalog,
catalog: Arc<Catalog>,
rules: Option<DatabaseRules>,
) -> Result<()> {

View File

@ -32,10 +32,7 @@ use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::RwLock;
use parquet_file::catalog::{CheckpointData, TransactionEnd};
use parquet_file::{
catalog::{
wipe as wipe_preserved_catalog, CatalogParquetInfo, CatalogState, ChunkCreationFailed,
PreservedCatalog,
},
catalog::{CatalogParquetInfo, CatalogState, ChunkCreationFailed, PreservedCatalog},
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
cleanup::cleanup_unreferenced_parquet_files,
metadata::IoxMetadata,
@ -224,7 +221,7 @@ pub struct Db {
exec: Arc<Executor>,
/// Preserved catalog (data in object store).
preserved_catalog: Arc<PreservedCatalog<Catalog>>,
preserved_catalog: Arc<PreservedCatalog>,
/// The catalog holds chunks of data under partitions for the database.
/// The underlying chunks may be backed by different execution engines
@ -275,7 +272,7 @@ pub async fn load_or_create_preserved_catalog(
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> std::result::Result<(PreservedCatalog<Catalog>, Arc<Catalog>), parquet_file::catalog::Error> {
) -> std::result::Result<(PreservedCatalog, Arc<Catalog>), parquet_file::catalog::Error> {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
@ -328,7 +325,7 @@ pub async fn load_or_create_preserved_catalog(
// https://github.com/influxdata/influxdb_iox/issues/1522)
// broken => wipe for now (at least during early iterations)
error!("cannot load catalog, so wipe it: {}", e);
wipe_preserved_catalog(&object_store, server_id, db_name).await?;
PreservedCatalog::wipe(&object_store, server_id, db_name).await?;
let metrics_domain =
metrics_registry.register_domain_with_labels("catalog", metric_labels.clone());
@ -359,7 +356,7 @@ impl Db {
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog<Catalog>,
preserved_catalog: PreservedCatalog,
catalog: Arc<Catalog>,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Self {
@ -1393,7 +1390,7 @@ mod tests {
ObjectStore, ObjectStoreApi,
};
use parquet_file::{
catalog::test_helpers::assert_catalog_state_implementation,
catalog::test_helpers::{assert_catalog_state_implementation, TestCatalogState},
metadata::IoxParquetMetaData,
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
@ -2884,15 +2881,14 @@ mod tests {
let server_id = ServerId::try_from(1).unwrap();
let db_name = "preserved_catalog_test";
let (preserved_catalog, _catalog) =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let (preserved_catalog, _catalog) = PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
@ -2923,15 +2919,14 @@ mod tests {
// ==================== check: empty catalog created ====================
// at this point, an empty preserved catalog exists
let maybe_preserved_catalog =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
let maybe_preserved_catalog = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap();
assert!(maybe_preserved_catalog.is_some());
// ==================== do: write data to parquet ====================
@ -2955,16 +2950,15 @@ mod tests {
}
}
paths_expected.sort();
let (_preserved_catalog, catalog) =
PreservedCatalog::<parquet_file::catalog::test_helpers::TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap()
.unwrap();
let (_preserved_catalog, catalog) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await
.unwrap()
.unwrap();
let paths_actual = {
let mut tmp: Vec<String> = catalog.parquet_files.keys().map(|p| p.display()).collect();
tmp.sort();

View File

@ -9,7 +9,7 @@ use object_store::{
};
use observability_deps::tracing::{debug, error, info, warn};
use parking_lot::Mutex;
use parquet_file::catalog::wipe as wipe_preserved_catalog;
use parquet_file::catalog::PreservedCatalog;
use query::exec::Executor;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
@ -397,7 +397,7 @@ impl InitStatus {
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
wipe_preserved_catalog(&store, server_id, &db_name)
PreservedCatalog::wipe(&store, server_id, &db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
@ -455,7 +455,7 @@ impl InitStatus {
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
wipe_preserved_catalog(&store, server_id, &db_name)
PreservedCatalog::wipe(&store, server_id, &db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;

View File

@ -1963,7 +1963,7 @@ mod tests {
)
.await
.unwrap();
let (preserved_catalog, _catalog) = PreservedCatalog::<TestCatalogState>::load(
let (preserved_catalog, _catalog) = PreservedCatalog::load::<TestCatalogState>(
Arc::clone(&store),
server_id,
db_name_catalog_broken.to_string(),
@ -2009,7 +2009,7 @@ mod tests {
.to_string(),
"database already exists"
);
assert!(PreservedCatalog::<TestCatalogState>::exists(
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_existing.to_string()
@ -2019,7 +2019,7 @@ mod tests {
// 2. wiping a non-existing DB just works, but won't bring DB into existence
assert!(server.error_database(&db_name_non_existing).is_none());
PreservedCatalog::<TestCatalogState>::new_empty(
PreservedCatalog::new_empty::<TestCatalogState>(
Arc::clone(&server.store),
server.require_id().unwrap(),
db_name_non_existing.to_string(),
@ -2036,7 +2036,7 @@ mod tests {
};
assert_eq!(metadata, &expected_metadata);
tracker.join().await;
assert!(!PreservedCatalog::<TestCatalogState>::exists(
assert!(!PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_non_existing.to_string()
@ -2057,7 +2057,7 @@ mod tests {
};
assert_eq!(metadata, &expected_metadata);
tracker.join().await;
assert!(!PreservedCatalog::<TestCatalogState>::exists(
assert!(!PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_rules_broken.to_string()
@ -2078,7 +2078,7 @@ mod tests {
};
assert_eq!(metadata, &expected_metadata);
tracker.join().await;
assert!(PreservedCatalog::<TestCatalogState>::exists(
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_catalog_broken.to_string()
@ -2106,7 +2106,7 @@ mod tests {
.to_string(),
"database already exists"
);
assert!(PreservedCatalog::<TestCatalogState>::exists(
assert!(PreservedCatalog::exists(
&server.store,
server.require_id().unwrap(),
&db_name_created.to_string()