From ac67caf7a1b20b14e7a47a4cea756c102aaa7cee Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 2 Aug 2021 17:21:22 +0100 Subject: [PATCH] refactor: rename ReadLock to Freezable (#2145) * refactor: rename ReadLock to Freezable and make exclusive * chore: review feedback --- internal_types/src/freezable.rs | 124 ++++++++++++++++++ internal_types/src/guard.rs | 72 ---------- internal_types/src/lib.rs | 2 +- .../src/persistence_windows.rs | 16 +-- 4 files changed, 132 insertions(+), 82 deletions(-) create mode 100644 internal_types/src/freezable.rs delete mode 100644 internal_types/src/guard.rs diff --git a/internal_types/src/freezable.rs b/internal_types/src/freezable.rs new file mode 100644 index 0000000000..9ddfc4b640 --- /dev/null +++ b/internal_types/src/freezable.rs @@ -0,0 +1,124 @@ +use std::ops::Deref; +use std::sync::{Arc, Weak}; + +/// A wrapper around a type `T` that can be frozen with `Freezable::try_freeze`, preventing +/// modification of the contained `T` until the returned `FreezeHandle` is dropped +/// +/// ``` +/// use internal_types::freezable::Freezable; +/// let mut data = Freezable::new(0); +/// assert!(data.get_mut().is_some()); +/// +/// *data.get_mut().unwrap() = 32; +/// assert_eq!(*data, 32); +/// +/// let handle = data.try_freeze().unwrap(); +/// assert!(data.get_mut().is_none()); +/// +/// // Cannot get another guard as one already exists +/// assert!(data.try_freeze().is_none()); +/// +/// *data.unfreeze(handle) = 34; +/// assert_eq!(*data, 34); +/// +/// // Can get mutable access as guards dropped +/// assert!(data.get_mut().is_some()); +/// +/// // Can get another guard +/// assert!(data.try_freeze().is_some()); +/// ``` +/// +/// This is useful for a common 3-stage transaction where nothing should be able to +/// start another transaction on the locked resource whilst this transaction is +/// in-progress, but locks are not held for the entirety of the transaction: +/// +/// 1. Exclusive access is obtained to a resource and work is identified +/// 2. Locks are dropped and some async work performed +/// 3. Locks are re-acquired and the result of async work committed +/// +/// ``` +/// use internal_types::freezable::Freezable; +/// use std::sync::RwLock; +/// +/// let lockable = RwLock::new(Freezable::new(23)); +/// +/// // Start transaction +/// let handle = { +/// let mut locked = lockable.write().unwrap(); +/// locked.try_freeze().expect("other transaction in progress") +/// }; +/// +/// // The contained data cannot be modified +/// assert!(lockable.write().unwrap().get_mut().is_none()); +/// // But it can still be read +/// assert_eq!(**lockable.read().unwrap(), 23); +/// +/// // -------------- +/// // Do async work +/// // -------------- +/// +/// // Finish transaction +/// { +/// let mut locked = lockable.write().unwrap(); +/// *locked.unfreeze(handle) = 45; +/// } +/// ``` +/// +#[derive(Debug)] +pub struct Freezable(Arc); + +impl Freezable { + pub fn new(payload: T) -> Self { + Self(Arc::new(payload)) + } + + /// Returns a `FreezeHandle` that prevents modification + /// of the contents of `Freezable` until it is dropped + /// + /// Returns None if the object is already frozen + pub fn try_freeze(&mut self) -> Option> { + // Verify exclusive + self.get_mut()?; + Some(FreezeHandle(Arc::downgrade(&self.0))) + } + + /// Unfreezes this instance, returning a mutable reference to + /// its contained data + pub fn unfreeze(&mut self, handle: FreezeHandle) -> &mut T { + assert!( + std::ptr::eq(&*self.0, handle.0.as_ptr()), + "provided FreezeHandle is not for this instance" + ); + std::mem::drop(handle); + // Just dropped `FreezeHandle` so should be valid + self.get_mut().unwrap() + } + + /// Try to get mutable access to the data + /// + /// Returns `None` if this instance is frozen + pub fn get_mut(&mut self) -> Option<&mut T> { + Arc::get_mut(&mut self.0) + } +} + +impl Deref for Freezable { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// `FreezeHandle` is returned by `Freezable::try_freeze` and prevents modification +/// of the contents of `Freezable`. Importantly: +/// +/// * `FreezeHandle` is `Send` and can be held across an await point +/// * `FreezeHandle` will unfreeze on `Drop` making it panic safe +/// * `FreezeHandle` is not coupled to the lifetime of the `Freezable` +/// +/// The last of these is critical to allowing the `FreezeHandle` +/// to outlive the `&mut Freezable` from which it was created +/// +#[derive(Debug)] +pub struct FreezeHandle(Weak); diff --git a/internal_types/src/guard.rs b/internal_types/src/guard.rs deleted file mode 100644 index 143d279e4b..0000000000 --- a/internal_types/src/guard.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::ops::Deref; -use std::sync::{Arc, Weak}; - -/// A wrapper around a type T that allows acquiring `ReadGuard` that prevents -/// modification to the data within the `ReadLock` until the `ReadGuard` is dropped -/// -/// ``` -/// use internal_types::guard::ReadLock; -/// let mut data = ReadLock::new(0); -/// assert!(data.get_mut().is_some()); -/// -/// *data.get_mut().unwrap() = 32; -/// assert_eq!(*data, 32); -/// -/// let guard = data.lock(); -/// assert!(data.get_mut().is_none()); -/// -/// let guard2 = data.lock(); -/// std::mem::drop(guard); -/// -/// assert!(data.get_mut().is_none()); -/// std::mem::drop(guard2); -/// -/// // Can get mutable access as guards dropped -/// assert!(data.get_mut().is_some()); -/// ``` -/// -/// The construction relies on the borrow checker to prevent creating -/// new `ReadGuard` whilst there is an active mutable borrow -/// -/// ```compile_fail -/// use internal_types::guard::ReadLock; -/// let mut data = ReadLock::new(0); -/// let mut_borrow = data.get_mut(); -/// -/// data.lock(); // Shouldn't compile -/// -/// std::mem::drop(mut_borrow); -/// ``` -/// -#[derive(Debug)] -pub struct ReadLock(Arc); - -impl ReadLock { - pub fn new(payload: T) -> Self { - Self(Arc::new(payload)) - } - - /// Returns a new `ReadGuard` that prevents modification - /// of the contents of `ReadLock` until dropped - pub fn lock(&self) -> ReadGuard { - ReadGuard(Arc::downgrade(&self.0)) - } - - /// Try to get access to the mutable data - /// - /// Returns `None` if there are outstanding `ReadGuard` - pub fn get_mut(&mut self) -> Option<&mut T> { - Arc::get_mut(&mut self.0) - } -} - -impl Deref for ReadLock { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -#[derive(Debug)] -pub struct ReadGuard(Weak); diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index 41fd3b006d..2e6114504f 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -8,7 +8,7 @@ pub mod access; pub mod atomic_instant; -pub mod guard; +pub mod freezable; pub mod once; pub mod schema; pub mod selection; diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index e272ec2af3..8a3912ea51 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -9,7 +9,7 @@ use chrono::{DateTime, TimeZone, Utc}; use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}; use entry::Sequence; -use internal_types::guard::{ReadGuard, ReadLock}; +use internal_types::freezable::{Freezable, FreezeHandle}; use crate::min_max_sequence::MinMaxSequence; use crate::{checkpoint::PartitionCheckpoint, min_max_sequence::OptionalMinMaxSequence}; @@ -37,7 +37,7 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); /// that arrived more than late_arrival_period seconds ago, as determined by wall clock time" #[derive(Debug)] pub struct PersistenceWindows { - persistable: ReadLock>, + persistable: Freezable>, closed: VecDeque, open: Option, @@ -66,7 +66,7 @@ pub struct PersistenceWindows { /// #[derive(Debug)] pub struct FlushHandle { - guard: ReadGuard>, + handle: FreezeHandle>, /// The number of closed windows at the time of the handle's creation /// /// This identifies the windows that can have their @@ -113,7 +113,7 @@ impl PersistenceWindows { let created_at_instant = Instant::now(); Self { - persistable: ReadLock::new(None), + persistable: Freezable::new(None), closed: VecDeque::with_capacity(closed_window_count as usize), open: None, addr, @@ -270,7 +270,7 @@ impl PersistenceWindows { /// /// Returns `None` if there is an outstanding handle pub fn flush_handle(&mut self, now: Instant) -> Option { - // Verify no active flush handles + // Verify no active flush handles before closing open window self.persistable.get_mut()?; // Close current open window if any @@ -282,7 +282,7 @@ impl PersistenceWindows { self.rotate(now); Some(FlushHandle { - guard: self.persistable.lock(), + handle: self.persistable.try_freeze()?, closed_count: self.closed.len(), addr: self.addr.clone(), timestamp: self.persistable.as_ref()?.max_time, @@ -294,7 +294,6 @@ impl PersistenceWindows { pub fn flush(&mut self, handle: FlushHandle) { let closed_count = handle.closed_count; let timestamp = handle.timestamp; - std::mem::drop(handle); assert!( self.closed.len() >= closed_count, @@ -303,8 +302,7 @@ impl PersistenceWindows { let persistable = self .persistable - .get_mut() - .expect("expected no active locks") + .unfreeze(handle.handle) .take() .expect("expected persistable window");