parent
3942622a93
commit
cf55df68b5
|
@ -318,14 +318,14 @@ impl PreservedCatalog {
|
|||
server_id: ServerId,
|
||||
db_name: String,
|
||||
state_data: S::EmptyInput,
|
||||
) -> Result<(Self, Arc<S>)>
|
||||
) -> Result<(Self, S)>
|
||||
where
|
||||
S: CatalogState + Send + Sync,
|
||||
{
|
||||
if Self::exists(&object_store, server_id, &db_name).await? {
|
||||
return Err(Error::AlreadyExists {});
|
||||
}
|
||||
let state = Arc::new(S::new_empty(&db_name, state_data));
|
||||
let state = S::new_empty(&db_name, state_data);
|
||||
|
||||
let catalog = Self {
|
||||
previous_tkey: RwLock::new(None),
|
||||
|
@ -355,7 +355,7 @@ impl PreservedCatalog {
|
|||
server_id: ServerId,
|
||||
db_name: String,
|
||||
state_data: S::EmptyInput,
|
||||
) -> Result<Option<(Self, Arc<S>)>>
|
||||
) -> Result<Option<(Self, S)>>
|
||||
where
|
||||
S: CatalogState + Send + Sync,
|
||||
{
|
||||
|
@ -457,7 +457,7 @@ impl PreservedCatalog {
|
|||
server_id,
|
||||
db_name,
|
||||
},
|
||||
Arc::new(state),
|
||||
state,
|
||||
)))
|
||||
}
|
||||
|
||||
|
@ -1330,7 +1330,7 @@ pub mod test_helpers {
|
|||
{
|
||||
// empty state
|
||||
let object_store = make_object_store();
|
||||
let (_catalog, state) = PreservedCatalog::new_empty::<S>(
|
||||
let (_catalog, mut state) = PreservedCatalog::new_empty::<S>(
|
||||
Arc::clone(&object_store),
|
||||
ServerId::try_from(1).unwrap(),
|
||||
"db1".to_string(),
|
||||
|
@ -1338,7 +1338,6 @@ pub mod test_helpers {
|
|||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut state = Arc::try_unwrap(state).unwrap();
|
||||
let mut expected = HashMap::new();
|
||||
assert_checkpoint(&state, &f, &expected);
|
||||
|
||||
|
@ -2502,7 +2501,7 @@ mod tests {
|
|||
let mut trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
|
||||
|
||||
// re-open catalog
|
||||
let (catalog, state) = PreservedCatalog::load::<TestCatalogState>(
|
||||
let (catalog, mut state) = PreservedCatalog::load::<TestCatalogState>(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
|
@ -2511,7 +2510,6 @@ mod tests {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let mut state = Arc::try_unwrap(state).unwrap();
|
||||
|
||||
// create empty transaction w/ checkpoint (the delta transaction file is not required for catalog loading)
|
||||
{
|
||||
|
@ -2688,7 +2686,7 @@ mod tests {
|
|||
server_id: ServerId,
|
||||
db_name: &str,
|
||||
) -> TestTrace {
|
||||
let (catalog, state) = PreservedCatalog::new_empty(
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
|
@ -2696,7 +2694,6 @@ mod tests {
|
|||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut state = Arc::try_unwrap(state).unwrap();
|
||||
|
||||
// get some test metadata
|
||||
let (_, metadata1) = make_metadata(object_store, "foo", chunk_addr(1)).await;
|
||||
|
|
|
@ -100,7 +100,7 @@ pub async fn rebuild_catalog<S>(
|
|||
db_name: String,
|
||||
catalog_empty_input: S::EmptyInput,
|
||||
ignore_metadata_read_failure: bool,
|
||||
) -> Result<(PreservedCatalog, Arc<S>)>
|
||||
) -> Result<(PreservedCatalog, S)>
|
||||
where
|
||||
S: CatalogState + Debug + Send + Sync,
|
||||
{
|
||||
|
@ -109,7 +109,7 @@ where
|
|||
collect_revisions(&object_store, search_location, ignore_metadata_read_failure).await?;
|
||||
|
||||
// create new empty catalog
|
||||
let (catalog, state) = PreservedCatalog::new_empty::<S>(
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty::<S>(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name,
|
||||
|
@ -117,7 +117,6 @@ where
|
|||
)
|
||||
.await
|
||||
.context(NewEmptyFailure)?;
|
||||
let mut state = Arc::try_unwrap(state).expect("dangling Arc?");
|
||||
|
||||
// trace all files for final checkpoint
|
||||
let mut collected_files = HashMap::new();
|
||||
|
@ -171,7 +170,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
Ok((catalog, Arc::new(state)))
|
||||
Ok((catalog, state))
|
||||
}
|
||||
|
||||
/// Collect all files under the given locations.
|
||||
|
@ -298,7 +297,7 @@ mod tests {
|
|||
let db_name = "db1";
|
||||
|
||||
// build catalog with some data
|
||||
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
|
@ -306,7 +305,6 @@ mod tests {
|
|||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut state = Arc::try_unwrap(state).unwrap();
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
|
@ -607,7 +605,7 @@ mod tests {
|
|||
let db_name = "db1";
|
||||
|
||||
// build catalog with some data (2 transactions + initial empty one)
|
||||
let (catalog, state) = PreservedCatalog::new_empty::<TestCatalogState>(
|
||||
let (catalog, mut state) = PreservedCatalog::new_empty::<TestCatalogState>(
|
||||
Arc::clone(&object_store),
|
||||
server_id,
|
||||
db_name.to_string(),
|
||||
|
@ -615,7 +613,6 @@ mod tests {
|
|||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut state = Arc::try_unwrap(state).unwrap();
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
|
|
|
@ -238,7 +238,7 @@ impl Config {
|
|||
object_store: Arc<ObjectStore>,
|
||||
exec: Arc<Executor>,
|
||||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Arc<Catalog>,
|
||||
catalog: Catalog,
|
||||
) {
|
||||
let mut state = self.state.write().expect("mutex poisoned");
|
||||
let name = rules.name.clone();
|
||||
|
@ -453,7 +453,7 @@ impl<'a> CreateDatabaseHandle<'a> {
|
|||
object_store: Arc<ObjectStore>,
|
||||
exec: Arc<Executor>,
|
||||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Arc<Catalog>,
|
||||
catalog: Catalog,
|
||||
rules: DatabaseRules,
|
||||
) -> Result<()> {
|
||||
let db_name = self.db_name.take().expect("not committed");
|
||||
|
@ -540,7 +540,7 @@ impl<'a> RecoverDatabaseHandle<'a> {
|
|||
object_store: Arc<ObjectStore>,
|
||||
exec: Arc<Executor>,
|
||||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Arc<Catalog>,
|
||||
catalog: Catalog,
|
||||
rules: Option<DatabaseRules>,
|
||||
) -> Result<()> {
|
||||
let db_name = self.db_name.take().expect("not committed");
|
||||
|
|
|
@ -284,7 +284,7 @@ pub async fn load_or_create_preserved_catalog(
|
|||
server_id: ServerId,
|
||||
metrics_registry: Arc<MetricRegistry>,
|
||||
wipe_on_error: bool,
|
||||
) -> std::result::Result<(PreservedCatalog, Arc<Catalog>), parquet_file::catalog::Error> {
|
||||
) -> std::result::Result<(PreservedCatalog, Catalog), parquet_file::catalog::Error> {
|
||||
let metric_labels = vec![
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
KeyValue::new("svr_id", format!("{}", server_id)),
|
||||
|
@ -369,7 +369,7 @@ impl Db {
|
|||
exec: Arc<Executor>,
|
||||
jobs: Arc<JobRegistry>,
|
||||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Arc<Catalog>,
|
||||
catalog: Catalog,
|
||||
write_buffer: Option<Arc<dyn WriteBuffer>>,
|
||||
) -> Self {
|
||||
let db_name = rules.name.clone();
|
||||
|
@ -379,6 +379,7 @@ impl Db {
|
|||
let store = Arc::clone(&object_store);
|
||||
let metrics_registry = Arc::clone(&catalog.metrics_registry);
|
||||
let metric_labels = catalog.metric_labels.clone();
|
||||
let catalog = Arc::new(catalog);
|
||||
|
||||
let catalog_access = QueryCatalogAccess::new(
|
||||
&db_name,
|
||||
|
|
Loading…
Reference in New Issue