refactor: Extract a struct to hold all the arguments needed to make a Db

pull/24376/head
Carol (Nichols || Goulding) 2021-06-23 14:23:05 -04:00
parent f903b6eca8
commit c0c1c3fd8e
5 changed files with 102 additions and 129 deletions

View File

@ -11,9 +11,8 @@ use query::exec::Executor;
/// This module contains code for managing the configuration of the server.
use crate::{
db::{catalog::Catalog, Db},
write_buffer::{self, WriteBuffer},
Error, JobRegistry, Result,
db::{catalog::Catalog, DatabaseToCommit, Db},
write_buffer, Error, JobRegistry, Result,
};
use observability_deps::tracing::{self, error, info, warn, Instrument};
use tokio::task::JoinHandle;
@ -231,34 +230,16 @@ impl Config {
}
/// Creates database in initialized state.
fn commit_db(
&self,
rules: DatabaseRules,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) {
fn commit_db(&self, database_to_commit: DatabaseToCommit) {
let mut state = self.state.write().expect("mutex poisoned");
let name = rules.name.clone();
let name = database_to_commit.rules.name.clone();
if self.shutdown.is_cancelled() {
error!("server is shutting down");
return;
}
let db = Arc::new(Db::new(
rules,
server_id,
object_store,
exec,
Arc::clone(&self.jobs),
preserved_catalog,
catalog,
write_buffer,
));
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.jobs)));
let shutdown = self.shutdown.child_token();
let shutdown_captured = shutdown.clone();
@ -439,34 +420,17 @@ impl<'a> CreateDatabaseHandle<'a> {
///
/// Will fail if database name used to create this handle and the name within `rules` do not match. In this case,
/// the database will be de-registered.
pub(crate) fn commit_db(
mut self,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
rules: DatabaseRules,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Result<()> {
pub(crate) fn commit_db(mut self, database_to_commit: DatabaseToCommit) -> Result<()> {
let db_name = self.db_name.take().expect("not committed");
if db_name != rules.name {
if db_name != database_to_commit.rules.name {
self.config.forget_reservation(&db_name);
return Err(Error::RulesDatabaseNameMismatch {
actual: rules.name.to_string(),
actual: database_to_commit.rules.name.to_string(),
expected: db_name.to_string(),
});
}
self.config.commit_db(
rules,
server_id,
object_store,
exec,
preserved_catalog,
catalog,
write_buffer,
);
self.config.commit_db(database_to_commit);
Ok(())
}
@ -554,15 +518,17 @@ impl<'a> RecoverDatabaseHandle<'a> {
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
self.config.commit_db(
rules,
let database_to_commit = DatabaseToCommit {
server_id,
object_store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
);
};
self.config.commit_db(database_to_commit);
Ok(())
}
@ -650,17 +616,17 @@ mod test {
{
let db_reservation = config.create_db(DatabaseName::new("bar").unwrap()).unwrap();
let err = db_reservation
.commit_db(
server_id,
Arc::clone(&store),
Arc::clone(&exec),
preserved_catalog,
catalog,
rules.clone(),
None,
)
.unwrap_err();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: Arc::clone(&store),
exec: Arc::clone(&exec),
preserved_catalog,
catalog,
rules: rules.clone(),
write_buffer: None,
};
let err = db_reservation.commit_db(database_to_commit).unwrap_err();
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
}
@ -674,17 +640,16 @@ mod test {
.await
.unwrap();
let db_reservation = config.create_db(name.clone()).unwrap();
db_reservation
.commit_db(
server_id,
store,
exec,
preserved_catalog,
catalog,
rules,
None,
)
.unwrap();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer: None,
};
db_reservation.commit_db(database_to_commit).unwrap();
assert!(config.db(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
@ -863,17 +828,18 @@ mod test {
)
.await
.unwrap();
db_reservation
.commit_db(
server_id,
store,
exec,
preserved_catalog,
catalog,
rules,
None,
)
.unwrap();
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer: None,
};
db_reservation.commit_db(database_to_commit).unwrap();
let token = config
.state

View File

@ -360,26 +360,28 @@ pub async fn load_or_create_preserved_catalog(
}
}
impl Db {
#[allow(clippy::clippy::too_many_arguments)]
pub fn new(
rules: DatabaseRules,
server_id: ServerId,
object_store: Arc<ObjectStore>,
exec: Arc<Executor>,
jobs: Arc<JobRegistry>,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
write_buffer: Option<Arc<dyn WriteBuffer>>,
) -> Self {
let db_name = rules.name.clone();
/// All the information needed to commit a database
#[derive(Debug)]
pub(crate) struct DatabaseToCommit {
pub(crate) server_id: ServerId,
pub(crate) object_store: Arc<ObjectStore>,
pub(crate) exec: Arc<Executor>,
pub(crate) preserved_catalog: PreservedCatalog,
pub(crate) catalog: Catalog,
pub(crate) rules: DatabaseRules,
pub(crate) write_buffer: Option<Arc<dyn WriteBuffer>>,
}
let rules = RwLock::new(rules);
let server_id = server_id;
let store = Arc::clone(&object_store);
let metrics_registry = Arc::clone(&catalog.metrics_registry);
let metric_labels = catalog.metric_labels.clone();
let catalog = Arc::new(catalog);
impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self {
let db_name = database_to_commit.rules.name.clone();
let rules = RwLock::new(database_to_commit.rules);
let server_id = database_to_commit.server_id;
let store = Arc::clone(&database_to_commit.object_store);
let metrics_registry = Arc::clone(&database_to_commit.catalog.metrics_registry);
let metric_labels = database_to_commit.catalog.metric_labels.clone();
let catalog = Arc::new(database_to_commit.catalog);
let catalog_access = QueryCatalogAccess::new(
&db_name,
@ -396,8 +398,8 @@ impl Db {
rules,
server_id,
store,
exec,
preserved_catalog: Arc::new(preserved_catalog),
exec: database_to_commit.exec,
preserved_catalog: Arc::new(database_to_commit.preserved_catalog),
catalog,
jobs,
metrics_registry,
@ -406,7 +408,7 @@ impl Db {
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
metric_labels,
write_buffer,
write_buffer: database_to_commit.write_buffer,
}
}

View File

@ -23,7 +23,7 @@ use tokio::sync::Semaphore;
use crate::{
config::{Config, DB_RULES_FILE_NAME},
db::load_or_create_preserved_catalog,
db::{load_or_create_preserved_catalog, DatabaseToCommit},
write_buffer, DatabaseError,
};
@ -332,17 +332,19 @@ impl InitStatus {
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
};
// everything is there, can create DB
handle
.commit_db(
server_id,
store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
)
.commit_db(database_to_commit)
.map_err(Box::new)
.context(CreateDbError)?;
Ok(())

View File

@ -73,7 +73,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::load_or_create_preserved_catalog;
use db::{load_or_create_preserved_catalog, DatabaseToCommit};
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex;
@ -504,15 +504,17 @@ where
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
db_reservation.commit_db(
let database_to_commit = DatabaseToCommit {
server_id,
Arc::clone(&self.store),
Arc::clone(&self.exec),
object_store: Arc::clone(&self.store),
exec: Arc::clone(&self.exec),
preserved_catalog,
catalog,
rules,
write_buffer,
)?;
};
db_reservation.commit_db(database_to_commit)?;
Ok(())
}

View File

@ -8,7 +8,7 @@ use object_store::{memory::InMemory, ObjectStore};
use query::{exec::Executor, QueryDatabase};
use crate::{
db::{load_or_create_preserved_catalog, Db},
db::{load_or_create_preserved_catalog, DatabaseToCommit, Db},
write_buffer::WriteBuffer,
JobRegistry,
};
@ -78,18 +78,19 @@ impl TestDbBuilder {
rules.lifecycle_rules.catalog_transactions_until_checkpoint =
self.catalog_transactions_until_checkpoint;
let database_to_commit = DatabaseToCommit {
rules,
server_id,
object_store,
preserved_catalog,
catalog,
write_buffer: self.write_buffer,
exec,
};
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(
rules,
server_id,
object_store,
exec,
Arc::new(JobRegistry::new()),
preserved_catalog,
catalog,
self.write_buffer,
),
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
}
}