From 5b69bb0d72fea17ad10231e8cd8865b8a1e7e16a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 12 Oct 2021 18:34:16 +0100 Subject: [PATCH] feat: reduce lifecycle lock scope (#2242) (#2810) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- lifecycle/src/policy.rs | 2 +- server/src/database.rs | 2 +- server/src/db.rs | 23 ++++---- server/src/db/replay.rs | 2 +- tracker/src/task.rs | 127 ++++++++++++++++++++++------------------ 5 files changed, 85 insertions(+), 71 deletions(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 993e568f7a..85817ff145 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -439,7 +439,7 @@ where /// The core policy logic /// /// Returns a future that resolves when this method should be called next - pub fn check_for_work(&mut self, now: DateTime) -> BoxFuture<'_, ()> { + pub fn check_for_work(&mut self, now: DateTime) -> BoxFuture<'static, ()> { // Any time-consuming work should be spawned as tokio tasks and not // run directly within this loop diff --git a/server/src/database.rs b/server/src/database.rs index 2df42205cb..820c083d82 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1133,7 +1133,7 @@ impl DatabaseStateCatalogLoaded { let db = Arc::clone(&self.db); // TODO: Pull write buffer and lifecycle out of Db - db.unsuppress_persistence().await; + db.unsuppress_persistence(); let rules = self.provided_rules.rules(); let write_buffer_factory = shared.application.write_buffer_factory(); diff --git a/server/src/db.rs b/server/src/db.rs index 92c2f7922d..be847d80ea 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -298,7 +298,7 @@ pub struct Db { /// This is stored here for the following reasons: /// - to control the persistence suppression via a [`Db::unsuppress_persistence`] /// - to keep the lifecycle state (e.g. the number of running compactions) around - lifecycle_policy: tokio::sync::Mutex>>, + lifecycle_policy: Mutex>>, time_provider: Arc, @@ -360,7 +360,7 @@ impl Db { worker_iterations_delete_predicate_preservation: AtomicUsize::new(0), write_buffer_producer: database_to_commit.write_buffer_producer, cleanup_lock: Default::default(), - lifecycle_policy: tokio::sync::Mutex::new(None), + lifecycle_policy: Mutex::new(None), time_provider: database_to_commit.time_provider, delete_predicates_mailbox: Default::default(), persisted_chunk_id_override: Default::default(), @@ -378,8 +378,8 @@ impl Db { } /// Allow persistence if database rules all it. - pub async fn unsuppress_persistence(&self) { - let mut guard = self.lifecycle_policy.lock().await; + pub fn unsuppress_persistence(&self) { + let mut guard = self.lifecycle_policy.lock(); let policy = guard .as_mut() .expect("lifecycle policy should be initialized"); @@ -819,15 +819,16 @@ impl Db { loop { self.worker_iterations_lifecycle .fetch_add(1, Ordering::Relaxed); - let mut guard = self.lifecycle_policy.lock().await; - let policy = guard - .as_mut() - .expect("lifecycle policy should be initialized"); + let fut = { + let mut guard = self.lifecycle_policy.lock(); + let policy = guard + .as_mut() + .expect("lifecycle policy should be initialized"); - policy - .check_for_work(self.time_provider.now().date_time()) - .await + policy.check_for_work(self.time_provider.now().date_time()) + }; + fut.await } }; diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index d77b29277d..b6f13762ed 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -711,7 +711,7 @@ mod tests { } let db = &test_db.db; - db.unsuppress_persistence().await; + db.unsuppress_persistence(); // wait until checks pass let t_0 = Instant::now(); diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 805ac59c2d..5451dbb075 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -116,6 +116,58 @@ struct TrackerState { notify: Notify, } +impl TrackerState { + fn get_status(&self) -> TaskStatus { + // The atomic decrement in TrackerRegistration::drop has release semantics + // acquire here ensures that if a thread observes the tracker to have + // no pending_registrations it cannot subsequently observe pending_futures + // to increase. If it could, observing pending_futures==0 would be insufficient + // to conclude there are no outstanding futures + let pending_registrations = self.pending_registrations.load(Ordering::Acquire); + + // The atomic decrement in TrackedFuture::drop has release semantics + // acquire therefore ensures that if a thread observes the completion of + // a TrackedFuture, it is guaranteed to see its updates (e.g. wall_nanos) + let pending_futures = self.pending_futures.load(Ordering::Acquire); + + match (pending_registrations == 0, pending_futures == 0) { + (false, _) => TaskStatus::Creating, + (true, false) => TaskStatus::Running { + total_count: self.created_futures.load(Ordering::Relaxed), + pending_count: self.pending_futures.load(Ordering::Relaxed), + cpu_nanos: self.cpu_nanos.load(Ordering::Relaxed), + }, + (true, true) => { + let total_count = self.created_futures.load(Ordering::Relaxed); + let success_count = self.ok_futures.load(Ordering::Relaxed); + let error_count = self.err_futures.load(Ordering::Relaxed); + let cancelled_count = self.cancelled_futures.load(Ordering::Relaxed); + + // Failure of this would imply a future reported its completion status multiple + // times or a future was created without incrementing created_futures. + // Both of these should be impossible + let dropped_count = total_count + .checked_sub(success_count + error_count + cancelled_count) + .expect("invalid tracker state"); + + TaskStatus::Complete { + total_count, + success_count, + error_count, + cancelled_count, + dropped_count, + cpu_nanos: self.cpu_nanos.load(Ordering::Relaxed), + wall_nanos: self.wall_nanos.load(Ordering::Relaxed), + } + } + } + } + + fn is_complete(&self) -> bool { + matches!(self.get_status(), TaskStatus::Complete { .. }) + } +} + /// Returns a summary of the task execution #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TaskResult { @@ -323,54 +375,12 @@ where /// Returns true if all futures associated with this tracker have /// been dropped and no more can be created pub fn is_complete(&self) -> bool { - matches!(self.get_status(), TaskStatus::Complete { .. }) + self.state.is_complete() } /// Gets the status of the tracker pub fn get_status(&self) -> TaskStatus { - // The atomic decrement in TrackerRegistration::drop has release semantics - // acquire here ensures that if a thread observes the tracker to have - // no pending_registrations it cannot subsequently observe pending_futures - // to increase. If it could, observing pending_futures==0 would be insufficient - // to conclude there are no outstanding futures - let pending_registrations = self.state.pending_registrations.load(Ordering::Acquire); - - // The atomic decrement in TrackedFuture::drop has release semantics - // acquire therefore ensures that if a thread observes the completion of - // a TrackedFuture, it is guaranteed to see its updates (e.g. wall_nanos) - let pending_futures = self.state.pending_futures.load(Ordering::Acquire); - - match (pending_registrations == 0, pending_futures == 0) { - (false, _) => TaskStatus::Creating, - (true, false) => TaskStatus::Running { - total_count: self.state.created_futures.load(Ordering::Relaxed), - pending_count: self.state.pending_futures.load(Ordering::Relaxed), - cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), - }, - (true, true) => { - let total_count = self.state.created_futures.load(Ordering::Relaxed); - let success_count = self.state.ok_futures.load(Ordering::Relaxed); - let error_count = self.state.err_futures.load(Ordering::Relaxed); - let cancelled_count = self.state.cancelled_futures.load(Ordering::Relaxed); - - // Failure of this would imply a future reported its completion status multiple - // times or a future was created without incrementing created_futures. - // Both of these should be impossible - let dropped_count = total_count - .checked_sub(success_count + error_count + cancelled_count) - .expect("invalid tracker state"); - - TaskStatus::Complete { - total_count, - success_count, - error_count, - cancelled_count, - dropped_count, - cpu_nanos: self.state.cpu_nanos.load(Ordering::Relaxed), - wall_nanos: self.state.wall_nanos.load(Ordering::Relaxed), - } - } - } + self.state.get_status() } /// Returns the instant the tracker was created @@ -385,21 +395,24 @@ where /// Blocks until all futures associated with the tracker have been /// dropped and no more can be created - pub async fn join(&self) { - // Notify is notified when pending_futures hits 0 AND when pending_registrations - // hits 0. In almost all cases join won't be called before pending_registrations - // has already hit 0, but in the extremely rare case this occurs the loop - // handles the spurious wakeup - loop { - // Request notification before checking if complete - // to avoid a race condition - let notify = self.state.notify.notified(); + pub fn join(&self) -> impl std::future::Future { + let state = Arc::clone(&self.state); + async move { + // Notify is notified when pending_futures hits 0 AND when pending_registrations + // hits 0. In almost all cases join won't be called before pending_registrations + // has already hit 0, but in the extremely rare case this occurs the loop + // handles the spurious wakeup + loop { + // Request notification before checking if complete + // to avoid a race condition + let notify = state.notify.notified(); - if self.is_complete() { - return; + if state.is_complete() { + return; + } + + notify.await } - - notify.await } } }