fix: Create WriteBuffer outside of commit_db so committing can't fail
parent
51e72c8821
commit
f903b6eca8
|
@ -12,7 +12,8 @@ use query::exec::Executor;
|
|||
/// This module contains code for managing the configuration of the server.
|
||||
use crate::{
|
||||
db::{catalog::Catalog, Db},
|
||||
write_buffer, Error, JobRegistry, Result,
|
||||
write_buffer::{self, WriteBuffer},
|
||||
Error, JobRegistry, Result,
|
||||
};
|
||||
use observability_deps::tracing::{self, error, info, warn, Instrument};
|
||||
use tokio::task::JoinHandle;
|
||||
|
@ -238,18 +239,16 @@ impl Config {
|
|||
exec: Arc<Executor>,
|
||||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Catalog,
|
||||
) -> Result<()> {
|
||||
write_buffer: Option<Arc<dyn WriteBuffer>>,
|
||||
) {
|
||||
let mut state = self.state.write().expect("mutex poisoned");
|
||||
let name = rules.name.clone();
|
||||
|
||||
if self.shutdown.is_cancelled() {
|
||||
error!("server is shutting down");
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
let write_buffer = write_buffer::new(&rules)
|
||||
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
|
||||
|
||||
let db = Arc::new(Db::new(
|
||||
rules,
|
||||
server_id,
|
||||
|
@ -286,7 +285,6 @@ impl Config {
|
|||
.is_none());
|
||||
state.reservations.remove(&name);
|
||||
state.uninitialized_databases.remove(&name);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a database in an uninitialized state but remembers rules that are supposed to use for the database.
|
||||
|
@ -449,6 +447,7 @@ impl<'a> CreateDatabaseHandle<'a> {
|
|||
preserved_catalog: PreservedCatalog,
|
||||
catalog: Catalog,
|
||||
rules: DatabaseRules,
|
||||
write_buffer: Option<Arc<dyn WriteBuffer>>,
|
||||
) -> Result<()> {
|
||||
let db_name = self.db_name.take().expect("not committed");
|
||||
if db_name != rules.name {
|
||||
|
@ -466,7 +465,8 @@ impl<'a> CreateDatabaseHandle<'a> {
|
|||
exec,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
)?;
|
||||
write_buffer,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -551,6 +551,9 @@ impl<'a> RecoverDatabaseHandle<'a> {
|
|||
});
|
||||
}
|
||||
|
||||
let write_buffer = write_buffer::new(&rules)
|
||||
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
|
||||
|
||||
self.config.commit_db(
|
||||
rules,
|
||||
server_id,
|
||||
|
@ -558,7 +561,8 @@ impl<'a> RecoverDatabaseHandle<'a> {
|
|||
exec,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
)?;
|
||||
write_buffer,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -654,6 +658,7 @@ mod test {
|
|||
preserved_catalog,
|
||||
catalog,
|
||||
rules.clone(),
|
||||
None,
|
||||
)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
|
||||
|
@ -670,7 +675,15 @@ mod test {
|
|||
.unwrap();
|
||||
let db_reservation = config.create_db(name.clone()).unwrap();
|
||||
db_reservation
|
||||
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
|
||||
.commit_db(
|
||||
server_id,
|
||||
store,
|
||||
exec,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
rules,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(config.db(&name).is_some());
|
||||
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
|
||||
|
@ -851,7 +864,15 @@ mod test {
|
|||
.await
|
||||
.unwrap();
|
||||
db_reservation
|
||||
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
|
||||
.commit_db(
|
||||
server_id,
|
||||
store,
|
||||
exec,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
rules,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let token = config
|
||||
|
|
|
@ -24,7 +24,7 @@ use tokio::sync::Semaphore;
|
|||
use crate::{
|
||||
config::{Config, DB_RULES_FILE_NAME},
|
||||
db::load_or_create_preserved_catalog,
|
||||
DatabaseError,
|
||||
write_buffer, DatabaseError,
|
||||
};
|
||||
|
||||
const STORE_ERROR_PAUSE_SECONDS: u64 = 100;
|
||||
|
@ -71,6 +71,9 @@ pub enum Error {
|
|||
DatabaseNameError {
|
||||
source: data_types::DatabaseNameError,
|
||||
},
|
||||
|
||||
#[snafu(display("Cannot create write buffer for writing: {}", source))]
|
||||
CreateWriteBufferForWriting { source: DatabaseError },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -326,9 +329,20 @@ impl InitStatus {
|
|||
.context(CatalogLoadError)
|
||||
{
|
||||
Ok((preserved_catalog, catalog)) => {
|
||||
let write_buffer =
|
||||
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
|
||||
|
||||
// everything is there, can create DB
|
||||
handle
|
||||
.commit_db(server_id, store, exec, preserved_catalog, catalog, rules)
|
||||
.commit_db(
|
||||
server_id,
|
||||
store,
|
||||
exec,
|
||||
preserved_catalog,
|
||||
catalog,
|
||||
rules,
|
||||
write_buffer,
|
||||
)
|
||||
.map_err(Box::new)
|
||||
.context(CreateDbError)?;
|
||||
Ok(())
|
||||
|
|
|
@ -501,6 +501,9 @@ where
|
|||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogLoadError)?;
|
||||
|
||||
let write_buffer = write_buffer::new(&rules)
|
||||
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
|
||||
|
||||
db_reservation.commit_db(
|
||||
server_id,
|
||||
Arc::clone(&self.store),
|
||||
|
@ -508,6 +511,7 @@ where
|
|||
preserved_catalog,
|
||||
catalog,
|
||||
rules,
|
||||
write_buffer,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue