From 9ea04a42ff6a5a64b5f30df111442dfef21e6d83 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 2 Aug 2021 15:17:22 +0200 Subject: [PATCH] refactor: start background worker before performing replay This enables compaction during replay. --- server/src/config.rs | 68 ++++++++++++++++++++++++++--------------- server/src/db.rs | 39 +++++++++++++++++++++-- server/src/db/replay.rs | 66 +++++++++++++++++++++------------------ server/src/init.rs | 2 ++ server/src/lib.rs | 5 +++ 5 files changed, 123 insertions(+), 57 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index 34fbf42ae8..f375c4d664 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -9,6 +9,7 @@ use data_types::{ }; use metrics::MetricRegistry; use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi}; +use parking_lot::Mutex; use parquet_file::catalog::PreservedCatalog; use persistence_windows::checkpoint::ReplayPlan; use write_buffer::config::WriteBufferConfig; @@ -320,12 +321,14 @@ enum DatabaseState { Replay { db: Arc, replay_plan: ReplayPlan, + handle: Mutex>>, + shutdown: CancellationToken, }, /// Fully initialized database. Initialized { db: Arc, - handle: Option>, + handle: Mutex>>, shutdown: CancellationToken, }, } @@ -333,7 +336,8 @@ enum DatabaseState { impl DatabaseState { fn join(&mut self) -> Option> { match self { - DatabaseState::Initialized { handle, .. } => handle.take(), + DatabaseState::Replay { handle, .. } => handle.lock().take(), + DatabaseState::Initialized { handle, .. } => handle.lock().take(), _ => None, } } @@ -396,11 +400,14 @@ impl DatabaseState { impl Drop for DatabaseState { fn drop(&mut self) { - if let DatabaseState::Initialized { + if let DatabaseState::Replay { + handle, shutdown, .. + } + | DatabaseState::Initialized { handle, shutdown, .. } = self { - if handle.is_some() { + if handle.lock().is_some() { // Join should be called on `DatabaseState` prior to dropping, for example, by // calling drain() on the owning `Config` warn!("DatabaseState dropped without waiting for background task to complete"); @@ -545,7 +552,29 @@ impl<'a> DatabaseHandle<'a> { let db = Db::new(database_to_commit, Arc::clone(application.job_registry())); - self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); + if self.config.shutdown.is_cancelled() { + error!("server is shutting down"); + return ServerShuttingDown.fail(); + } + + let shutdown = self.config.shutdown.child_token(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let rules = db.rules(); + + let handle = Mutex::new(Some(tokio::spawn(async move { + db_captured + .background_worker(shutdown_captured) + .instrument(tracing::info_span!("db_worker", database=%rules.name)) + .await + }))); + + self.state = Some(Arc::new(DatabaseState::Replay { + db, + replay_plan, + handle, + shutdown, + })); Ok(()) } @@ -560,28 +589,17 @@ impl<'a> DatabaseHandle<'a> { /// Advance database state to [`Initialized`](DatabaseStateCode::Initialized). pub fn advance_init(&mut self) -> Result<()> { match self.state().as_ref() { - DatabaseState::Replay { db, .. } => { - if self.config.shutdown.is_cancelled() { - error!("server is shutting down"); - return ServerShuttingDown.fail(); - } - - let shutdown = self.config.shutdown.child_token(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(db); - let rules = db.rules(); - - let handle = Some(tokio::spawn(async move { - db_captured - .background_worker(shutdown_captured) - .instrument(tracing::info_span!("db_worker", database=%rules.name)) - .await - })); - + DatabaseState::Replay { + db, + handle, + shutdown, + .. + } => { + let handle = handle.lock().take(); self.state = Some(Arc::new(DatabaseState::Initialized { db: Arc::clone(db), - handle, - shutdown, + handle: Mutex::new(handle), + shutdown: shutdown.clone(), })); Ok(()) diff --git a/server/src/db.rs b/server/src/db.rs index f5591368a9..32d5ef9a78 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -45,7 +45,7 @@ use std::{ any::Any, collections::HashMap, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, @@ -372,6 +372,11 @@ pub struct Db { /// /// Optional because it will be created after `Arc`. lifcycle_policy: tokio::sync::Mutex>>, + + /// Flag to stop background worker from reading from the write buffer. + /// + /// TODO: Move write buffer read loop out of Db. + no_write_buffer_read: AtomicBool, } /// All the information needed to commit a database @@ -431,14 +436,31 @@ impl Db { write_buffer: database_to_commit.write_buffer, cleanup_lock: Default::default(), lifcycle_policy: tokio::sync::Mutex::new(None), + no_write_buffer_read: AtomicBool::new(true), }; let this = Arc::new(this); *this.lifcycle_policy.try_lock().expect("not used yet") = Some( - ::lifecycle::LifecyclePolicy::new(WeakDb(Arc::downgrade(&this))), + ::lifecycle::LifecyclePolicy::new_suppress_persistence(WeakDb(Arc::downgrade(&this))), ); this } + /// Allow persistence if database rules all it. + pub async fn unsuppress_persistence(&self) { + let mut guard = self.lifcycle_policy.lock().await; + let policy = guard + .as_mut() + .expect("lifecycle policy should be initialized"); + policy.unsuppress_persistence(); + } + + /// Allow continuous reads from the write buffer (if configured). + /// + /// TODO: Move write buffer read loop out of Db. + pub fn allow_write_buffer_read(&self) { + self.no_write_buffer_read.store(false, Ordering::SeqCst); + } + /// Return a handle to the executor used to run queries pub fn executor(&self) -> Arc { Arc::clone(&self.exec) @@ -814,6 +836,16 @@ impl Db { // streaming from the write buffer loop async { if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer { + // wait for permission + tokio::select! { + _ = async { + while self.no_write_buffer_read.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } => {}, + _ = shutdown.cancelled() => return, + } + let mut write_buffer = write_buffer .try_lock() .expect("no streams should exist at this point"); @@ -1535,6 +1567,7 @@ mod tests { let db_captured = Arc::clone(&db); let join_handle = tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + db.allow_write_buffer_read(); let query = "select * from cpu"; @@ -1674,6 +1707,7 @@ mod tests { let db_captured = Arc::clone(&db); let join_handle = tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + db.allow_write_buffer_read(); // check: after a while the error should be reported in the database's metrics let t_0 = Instant::now(); @@ -2783,6 +2817,7 @@ mod tests { let db_captured = Arc::clone(&db); let join_handle = tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + db.allow_write_buffer_read(); // check: after a while the persistence windows should have the expected data let t_0 = Instant::now(); diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 2995442002..07f97ced64 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -231,6 +231,7 @@ mod tests { }; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk}; use test_helpers::assert_contains; + use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use write_buffer::{ config::WriteBufferConfig, @@ -286,6 +287,9 @@ mod tests { Ingest(Vec), /// Restart DB + /// + /// Background loop is started as well but neither persistence nor write buffer reads are allowed until + /// [`Await`](Self::Await) is used. Restart, /// Perform replay @@ -301,7 +305,7 @@ mod tests { /// Wait that background loop generates desired state (all checks pass). /// - /// Background loop is started before the check and stopped afterwards. + /// Persistence and write buffer reads are enabled in preparation to this step. Await(Vec), } @@ -313,17 +317,13 @@ mod tests { /// What to do in which order. /// /// # Serialization - /// The execution of the entire test is purely serial with the exception of [`Await`](Step::Await) (see - /// next section). That means that nothing happens concurrently during each step. Every step is finished and + /// Every step is finished and /// checked for errors before the next is started (e.g. [`Replay`](Step::Replay) is fully executed and /// it is ensured that there were no errors before a subsequent [`Assert`](Step::Assert) is evaluated). - /// The database background worker is NOT active during any non-[`Await`](Step::Await) /// - /// # Await - /// Sometimes the background worker is needed to perform something, e.g. to consume some data from the write - /// buffer. In that case [`Await`](Step::Await) can be used. During this check (and only during this - /// check) the background worker is active and the checks passed to [`Await`](Step::Await) are - /// evaluated until they succeed. The background worker is stopped before the next test step is evaluated. + /// # Await / Background Worker + /// The database background worker is started during when the DB is created but persistence and reads from the + /// write buffer are disabled until [`Await`](Step::Await) is used. steps: Vec, } @@ -340,7 +340,7 @@ mod tests { }; let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers); - let mut test_db = Self::create_test_db( + let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db( Arc::clone(&object_store), write_buffer_state.clone(), server_id, @@ -360,11 +360,15 @@ mod tests { } } Step::Restart => { - // first drop old DB + // stop background worker + shutdown.cancel(); + join_handle.await.unwrap(); + + // drop old DB drop(test_db); // then create new one - test_db = Self::create_test_db( + let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db( Arc::clone(&object_store), write_buffer_state.clone(), server_id, @@ -372,11 +376,14 @@ mod tests { partition_template.clone(), ) .await; + test_db = test_db_tmp; + shutdown = shutdown_tmp; + join_handle = join_handle_tmp; } Step::Replay => { - test_db - .db - .perform_replay(&test_db.replay_plan, false) + let db = &test_db.db; + + db.perform_replay(&test_db.replay_plan, false) .await .unwrap(); } @@ -413,14 +420,8 @@ mod tests { } Step::Await(checks) => { let db = &test_db.db; - - // start background worker - let shutdown: CancellationToken = Default::default(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(db); - let join_handle = tokio::spawn(async move { - db_captured.background_worker(shutdown_captured).await - }); + db.unsuppress_persistence().await; + db.allow_write_buffer_read(); // wait until checks pass let t_0 = Instant::now(); @@ -439,10 +440,6 @@ mod tests { } tokio::time::sleep(Duration::from_millis(100)).await; } - - // stop background worker - shutdown.cancel(); - join_handle.await.unwrap(); } } } @@ -471,9 +468,9 @@ mod tests { server_id: ServerId, db_name: &'static str, partition_template: PartitionTemplate, - ) -> TestDb { + ) -> (TestDb, CancellationToken, JoinHandle<()>) { let write_buffer = MockBufferForReading::new(write_buffer_state); - TestDb::builder() + let test_db = TestDb::builder() .object_store(object_store) .server_id(server_id) .write_buffer(WriteBufferConfig::Reading(Arc::new( @@ -486,7 +483,16 @@ mod tests { .partition_template(partition_template) .db_name(db_name) .build() - .await + .await; + + // start background worker + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&test_db.db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + (test_db, shutdown, join_handle) } /// Evaluates given checks. diff --git a/server/src/init.rs b/server/src/init.rs index 60093b6915..0750a3c192 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -353,6 +353,8 @@ async fn try_advance_database_init_process( .await .map_err(Box::new) .context(ReplayError)?; + db.unsuppress_persistence().await; + db.allow_write_buffer_read(); handle .advance_init() diff --git a/server/src/lib.rs b/server/src/lib.rs index 409365fc9c..7501788293 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -593,6 +593,11 @@ where db_reservation.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)?; // no actual replay required + let db = db_reservation + .db_any_state() + .expect("DB should exist at this point"); + db.unsuppress_persistence().await; + db.allow_write_buffer_read(); db_reservation.advance_init()?; // ready to commit