fix: Pass the database name into PreservedCatalog
parent
276aef69c9
commit
8407735e00
|
@ -54,17 +54,21 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// The exclusive access can be dropped after this method returned and before calling
|
||||
/// [`delete_files`].
|
||||
pub async fn get_unreferenced_parquet_files(
|
||||
db_name: &str,
|
||||
catalog: &PreservedCatalog,
|
||||
max_files: usize,
|
||||
) -> Result<Vec<ParquetFilePath>> {
|
||||
let iox_object_store = catalog.iox_object_store();
|
||||
let all_known = {
|
||||
// replay catalog transactions to track ALL (even dropped) files that are referenced
|
||||
let (_catalog, state) =
|
||||
PreservedCatalog::load::<TracerCatalogState>(Arc::clone(&iox_object_store), ())
|
||||
.await
|
||||
.context(CatalogLoadError)?
|
||||
.expect("catalog gone while reading it?");
|
||||
let (_catalog, state) = PreservedCatalog::load::<TracerCatalogState>(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.context(CatalogLoadError)?
|
||||
.expect("catalog gone while reading it?");
|
||||
|
||||
state.files.into_inner()
|
||||
};
|
||||
|
@ -160,7 +164,7 @@ impl CatalogState for TracerCatalogState {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
catalog::test_helpers::new_empty,
|
||||
catalog::test_helpers::{new_empty, DB_NAME},
|
||||
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
|
||||
};
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
@ -173,7 +177,7 @@ mod tests {
|
|||
let (catalog, _state) = new_empty(&iox_object_store).await;
|
||||
|
||||
// run clean-up
|
||||
let files = get_unreferenced_parquet_files(&catalog, 1_000)
|
||||
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
|
||||
.await
|
||||
.unwrap();
|
||||
delete_files(&catalog, &files).await.unwrap();
|
||||
|
@ -227,7 +231,7 @@ mod tests {
|
|||
}
|
||||
|
||||
// run clean-up
|
||||
let files = get_unreferenced_parquet_files(&catalog, 1_000)
|
||||
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
|
||||
.await
|
||||
.unwrap();
|
||||
delete_files(&catalog, &files).await.unwrap();
|
||||
|
@ -285,7 +289,7 @@ mod tests {
|
|||
},
|
||||
async {
|
||||
let guard = lock.write().await;
|
||||
let files = get_unreferenced_parquet_files(&catalog, 1_000)
|
||||
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 1_000)
|
||||
.await
|
||||
.unwrap();
|
||||
drop(guard);
|
||||
|
@ -319,7 +323,9 @@ mod tests {
|
|||
}
|
||||
|
||||
// run clean-up
|
||||
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
|
||||
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(files.len(), 2);
|
||||
delete_files(&catalog, &files).await.unwrap();
|
||||
|
||||
|
@ -329,7 +335,9 @@ mod tests {
|
|||
assert_eq!(leftover.len(), 1);
|
||||
|
||||
// run clean-up again
|
||||
let files = get_unreferenced_parquet_files(&catalog, 2).await.unwrap();
|
||||
let files = get_unreferenced_parquet_files(DB_NAME, &catalog, 2)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(files.len(), 1);
|
||||
delete_files(&catalog, &files).await.unwrap();
|
||||
|
||||
|
|
|
@ -265,17 +265,19 @@ impl PreservedCatalog {
|
|||
/// An empty transaction will be used to mark the catalog start so that concurrent open but
|
||||
/// still-empty catalogs can easily be detected.
|
||||
pub async fn new_empty<S>(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
state_data: S::EmptyInput,
|
||||
) -> Result<(Self, S)>
|
||||
where
|
||||
S: CatalogState + Send + Sync,
|
||||
{
|
||||
Self::new_empty_inner::<S>(iox_object_store, state_data, None, None).await
|
||||
Self::new_empty_inner::<S>(db_name, iox_object_store, state_data, None, None).await
|
||||
}
|
||||
|
||||
/// Same as [`new_empty`](Self::new_empty) but for testing.
|
||||
pub async fn new_empty_for_testing<S>(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
state_data: S::EmptyInput,
|
||||
fixed_uuid: Uuid,
|
||||
|
@ -285,6 +287,7 @@ impl PreservedCatalog {
|
|||
S: CatalogState + Send + Sync,
|
||||
{
|
||||
Self::new_empty_inner::<S>(
|
||||
db_name,
|
||||
iox_object_store,
|
||||
state_data,
|
||||
Some(fixed_uuid),
|
||||
|
@ -294,6 +297,7 @@ impl PreservedCatalog {
|
|||
}
|
||||
|
||||
pub async fn new_empty_inner<S>(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
state_data: S::EmptyInput,
|
||||
fixed_uuid: Option<Uuid>,
|
||||
|
@ -305,7 +309,7 @@ impl PreservedCatalog {
|
|||
if Self::exists(&iox_object_store).await? {
|
||||
return Err(Error::AlreadyExists {});
|
||||
}
|
||||
let state = S::new_empty(iox_object_store.database_name(), state_data);
|
||||
let state = S::new_empty(db_name, state_data);
|
||||
|
||||
let catalog = Self {
|
||||
previous_tkey: RwLock::new(None),
|
||||
|
@ -331,6 +335,7 @@ impl PreservedCatalog {
|
|||
/// Loading starts at the latest checkpoint or -- if none exists -- at transaction `0`.
|
||||
/// Transactions before that point are neither verified nor are they required to exist.
|
||||
pub async fn load<S>(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
state_data: S::EmptyInput,
|
||||
) -> Result<Option<(Self, S)>>
|
||||
|
@ -397,7 +402,7 @@ impl PreservedCatalog {
|
|||
}
|
||||
|
||||
// setup empty state
|
||||
let mut state = S::new_empty(iox_object_store.database_name(), state_data);
|
||||
let mut state = S::new_empty(db_name, state_data);
|
||||
let mut last_tkey = None;
|
||||
|
||||
// detect replay start
|
||||
|
@ -1058,7 +1063,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::catalog::test_helpers::{
|
||||
break_catalog_with_weird_version, create_delete_predicate, exists, load_err, load_ok,
|
||||
new_empty, TestCatalogState,
|
||||
new_empty, TestCatalogState, DB_NAME,
|
||||
};
|
||||
use crate::test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize};
|
||||
|
||||
|
@ -1980,9 +1985,12 @@ mod tests {
|
|||
|
||||
new_empty(&iox_object_store).await;
|
||||
|
||||
let res =
|
||||
PreservedCatalog::new_empty::<TestCatalogState>(Arc::clone(&iox_object_store), ())
|
||||
.await;
|
||||
let res = PreservedCatalog::new_empty::<TestCatalogState>(
|
||||
DB_NAME,
|
||||
Arc::clone(&iox_object_store),
|
||||
(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(res.unwrap_err().to_string(), "Catalog already exists");
|
||||
}
|
||||
|
||||
|
|
|
@ -218,17 +218,17 @@ impl Debug for Metadata {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use chrono::{TimeZone, Utc};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
catalog::{
|
||||
core::PreservedCatalog, interface::CatalogParquetInfo, test_helpers::TestCatalogState,
|
||||
core::PreservedCatalog,
|
||||
interface::CatalogParquetInfo,
|
||||
test_helpers::{TestCatalogState, DB_NAME},
|
||||
},
|
||||
test_utils::{chunk_addr, make_iox_object_store, make_metadata, TestSize},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use chrono::{TimeZone, Utc};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dump_default_options() {
|
||||
|
@ -236,6 +236,7 @@ mod tests {
|
|||
|
||||
// build catalog with some data
|
||||
let (catalog, _state) = PreservedCatalog::new_empty_for_testing::<TestCatalogState>(
|
||||
DB_NAME,
|
||||
Arc::clone(&iox_object_store),
|
||||
(),
|
||||
Uuid::nil(),
|
||||
|
@ -355,6 +356,7 @@ File {
|
|||
|
||||
// build catalog with some data
|
||||
let (catalog, _state) = PreservedCatalog::new_empty_for_testing::<TestCatalogState>(
|
||||
DB_NAME,
|
||||
Arc::clone(&iox_object_store),
|
||||
(),
|
||||
Uuid::nil(),
|
||||
|
|
|
@ -68,6 +68,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// - **Metadata Read Failure:** There is a parquet file with metadata that cannot be read. Set
|
||||
/// `ignore_metadata_read_failure` to `true` to ignore these cases.
|
||||
pub async fn rebuild_catalog<S>(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
catalog_empty_input: S::EmptyInput,
|
||||
ignore_metadata_read_failure: bool,
|
||||
|
@ -79,10 +80,13 @@ where
|
|||
let files = collect_files(&iox_object_store, ignore_metadata_read_failure).await?;
|
||||
|
||||
// create new empty catalog
|
||||
let (catalog, mut state) =
|
||||
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), catalog_empty_input)
|
||||
.await
|
||||
.context(NewEmptyFailure)?;
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty::<S>(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
catalog_empty_input,
|
||||
)
|
||||
.await
|
||||
.context(NewEmptyFailure)?;
|
||||
|
||||
// create single transaction with all files
|
||||
if !files.is_empty() {
|
||||
|
@ -172,7 +176,7 @@ mod tests {
|
|||
use crate::{
|
||||
catalog::{
|
||||
core::PreservedCatalog,
|
||||
test_helpers::{exists, new_empty, TestCatalogState},
|
||||
test_helpers::{exists, new_empty, TestCatalogState, DB_NAME},
|
||||
},
|
||||
metadata::IoxMetadata,
|
||||
storage::{MemWriter, Storage},
|
||||
|
@ -235,9 +239,10 @@ mod tests {
|
|||
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
|
||||
|
||||
// rebuild
|
||||
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
let (catalog, state) =
|
||||
rebuild_catalog::<TestCatalogState>(DB_NAME, iox_object_store, (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check match
|
||||
let paths_actual = {
|
||||
|
@ -261,9 +266,10 @@ mod tests {
|
|||
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
|
||||
|
||||
// rebuild
|
||||
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
let (catalog, state) =
|
||||
rebuild_catalog::<TestCatalogState>(DB_NAME, iox_object_store, (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// check match
|
||||
assert!(state.files().next().is_none());
|
||||
|
@ -288,14 +294,16 @@ mod tests {
|
|||
|
||||
// rebuild (do not ignore errors)
|
||||
let res =
|
||||
rebuild_catalog::<TestCatalogState>(Arc::clone(&iox_object_store), (), false).await;
|
||||
rebuild_catalog::<TestCatalogState>(DB_NAME, Arc::clone(&iox_object_store), (), false)
|
||||
.await;
|
||||
assert!(dbg!(res.unwrap_err().to_string())
|
||||
.starts_with("Cannot read IOx metadata from parquet file"));
|
||||
|
||||
// rebuild (ignore errors)
|
||||
let (catalog, state) = rebuild_catalog::<TestCatalogState>(iox_object_store, (), true)
|
||||
.await
|
||||
.unwrap();
|
||||
let (catalog, state) =
|
||||
rebuild_catalog::<TestCatalogState>(DB_NAME, iox_object_store, (), true)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(state.files().next().is_none());
|
||||
assert_eq!(catalog.revision_counter(), 0);
|
||||
}
|
||||
|
@ -321,9 +329,10 @@ mod tests {
|
|||
PreservedCatalog::wipe(&iox_object_store).await.unwrap();
|
||||
|
||||
// rebuild
|
||||
let catalog = rebuild_catalog::<TestCatalogState>(Arc::clone(&iox_object_store), (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
let catalog =
|
||||
rebuild_catalog::<TestCatalogState>(DB_NAME, Arc::clone(&iox_object_store), (), false)
|
||||
.await
|
||||
.unwrap();
|
||||
drop(catalog);
|
||||
|
||||
// delete transaction files
|
||||
|
|
|
@ -29,6 +29,10 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
/// Metrics need a database name, but what the database name is doesn't matter for what's tested
|
||||
/// in this crate. This is an arbitrary name that can be used wherever a database name is needed.
|
||||
pub const DB_NAME: &str = "db1";
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Table {
|
||||
pub partitions: HashMap<Arc<str>, Partition>,
|
||||
|
@ -217,14 +221,14 @@ pub async fn exists(iox_object_store: &Arc<IoxObjectStore>) -> bool {
|
|||
pub async fn load_ok(
|
||||
iox_object_store: &Arc<IoxObjectStore>,
|
||||
) -> Option<(PreservedCatalog, TestCatalogState)> {
|
||||
PreservedCatalog::load(Arc::clone(iox_object_store), ())
|
||||
PreservedCatalog::load(DB_NAME, Arc::clone(iox_object_store), ())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Load a `PreservedCatalog` and unwrap the error, expecting the operation to fail
|
||||
pub async fn load_err(iox_object_store: &Arc<IoxObjectStore>) -> crate::catalog::core::Error {
|
||||
PreservedCatalog::load::<TestCatalogState>(Arc::clone(iox_object_store), ())
|
||||
PreservedCatalog::load::<TestCatalogState>(DB_NAME, Arc::clone(iox_object_store), ())
|
||||
.await
|
||||
.unwrap_err()
|
||||
}
|
||||
|
@ -233,7 +237,7 @@ pub async fn load_err(iox_object_store: &Arc<IoxObjectStore>) -> crate::catalog:
|
|||
pub async fn new_empty(
|
||||
iox_object_store: &Arc<IoxObjectStore>,
|
||||
) -> (PreservedCatalog, TestCatalogState) {
|
||||
PreservedCatalog::new_empty(Arc::clone(iox_object_store), ())
|
||||
PreservedCatalog::new_empty(DB_NAME, Arc::clone(iox_object_store), ())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -272,7 +276,7 @@ where
|
|||
// empty state
|
||||
let iox_object_store = make_iox_object_store().await;
|
||||
let (_catalog, mut state) =
|
||||
PreservedCatalog::new_empty::<S>(Arc::clone(&iox_object_store), state_data)
|
||||
PreservedCatalog::new_empty::<S>(DB_NAME, Arc::clone(&iox_object_store), state_data)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -212,6 +212,7 @@ impl Database {
|
|||
.context(SavingRules)?;
|
||||
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
Arc::clone(application.metric_registry()),
|
||||
true,
|
||||
|
|
|
@ -240,6 +240,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
pub struct Db {
|
||||
rules: RwLock<Arc<DatabaseRules>>,
|
||||
|
||||
name: Arc<str>,
|
||||
|
||||
server_id: ServerId, // this is also the Query Server ID
|
||||
|
||||
/// Interface to use for persistence
|
||||
|
@ -330,7 +332,7 @@ pub(crate) struct DatabaseToCommit {
|
|||
|
||||
impl Db {
|
||||
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Arc<Self> {
|
||||
let db_name = database_to_commit.rules.name.clone();
|
||||
let name = Arc::from(database_to_commit.rules.name.as_str());
|
||||
|
||||
let rules = RwLock::new(database_to_commit.rules);
|
||||
let server_id = database_to_commit.server_id;
|
||||
|
@ -339,7 +341,7 @@ impl Db {
|
|||
let catalog = Arc::new(database_to_commit.catalog);
|
||||
|
||||
let catalog_access = QueryCatalogAccess::new(
|
||||
&db_name,
|
||||
&*name,
|
||||
Arc::clone(&catalog),
|
||||
Arc::clone(&jobs),
|
||||
database_to_commit.metric_registry.as_ref(),
|
||||
|
@ -348,6 +350,7 @@ impl Db {
|
|||
|
||||
let this = Self {
|
||||
rules,
|
||||
name,
|
||||
server_id,
|
||||
iox_object_store,
|
||||
exec: database_to_commit.exec,
|
||||
|
@ -397,6 +400,10 @@ impl Db {
|
|||
Arc::clone(&*self.rules.read())
|
||||
}
|
||||
|
||||
pub fn name(&self) -> Arc<str> {
|
||||
Arc::clone(&self.name)
|
||||
}
|
||||
|
||||
/// Updates the database rules
|
||||
pub fn update_rules(&self, new_rules: Arc<DatabaseRules>) {
|
||||
let late_arrive_window_updated = {
|
||||
|
@ -936,7 +943,8 @@ impl Db {
|
|||
self: &Arc<Self>,
|
||||
) -> std::result::Result<(), parquet_file::catalog::cleanup::Error> {
|
||||
let guard = self.cleanup_lock.write().await;
|
||||
let files = get_unreferenced_parquet_files(&self.preserved_catalog, 1_000).await?;
|
||||
let files =
|
||||
get_unreferenced_parquet_files(&self.name(), &self.preserved_catalog, 1_000).await?;
|
||||
drop(guard);
|
||||
|
||||
delete_parquet_files(&self.preserved_catalog, &files).await
|
||||
|
|
|
@ -58,6 +58,7 @@ pub async fn load_or_create_preserved_catalog(
|
|||
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
|
||||
// first try to load existing catalogs
|
||||
match PreservedCatalog::load(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
LoaderEmptyInput::new(Arc::clone(&metric_registry), skip_replay),
|
||||
)
|
||||
|
@ -82,8 +83,13 @@ pub async fn load_or_create_preserved_catalog(
|
|||
db_name
|
||||
);
|
||||
|
||||
create_preserved_catalog(Arc::clone(&iox_object_store), metric_registry, skip_replay)
|
||||
.await
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
metric_registry,
|
||||
skip_replay,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(e) => {
|
||||
if wipe_on_error {
|
||||
|
@ -96,6 +102,7 @@ pub async fn load_or_create_preserved_catalog(
|
|||
.context(CannotWipeCatalog)?;
|
||||
|
||||
create_preserved_catalog(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
metric_registry,
|
||||
skip_replay,
|
||||
|
@ -112,11 +119,13 @@ pub async fn load_or_create_preserved_catalog(
|
|||
///
|
||||
/// This will fail if a preserved catalog already exists.
|
||||
pub async fn create_preserved_catalog(
|
||||
db_name: &str,
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
skip_replay: bool,
|
||||
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
|
||||
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
|
||||
db_name,
|
||||
Arc::clone(&iox_object_store),
|
||||
LoaderEmptyInput::new(metric_registry, skip_replay),
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue