refactor: rename ReadLock to Freezable (#2145)
* refactor: rename ReadLock to Freezable and make exclusive * chore: review feedbackpull/24376/head
parent
54af62b8b6
commit
ac67caf7a1
|
@ -0,0 +1,124 @@
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
|
/// A wrapper around a type `T` that can be frozen with `Freezable::try_freeze`, preventing
|
||||||
|
/// modification of the contained `T` until the returned `FreezeHandle` is dropped
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use internal_types::freezable::Freezable;
|
||||||
|
/// let mut data = Freezable::new(0);
|
||||||
|
/// assert!(data.get_mut().is_some());
|
||||||
|
///
|
||||||
|
/// *data.get_mut().unwrap() = 32;
|
||||||
|
/// assert_eq!(*data, 32);
|
||||||
|
///
|
||||||
|
/// let handle = data.try_freeze().unwrap();
|
||||||
|
/// assert!(data.get_mut().is_none());
|
||||||
|
///
|
||||||
|
/// // Cannot get another guard as one already exists
|
||||||
|
/// assert!(data.try_freeze().is_none());
|
||||||
|
///
|
||||||
|
/// *data.unfreeze(handle) = 34;
|
||||||
|
/// assert_eq!(*data, 34);
|
||||||
|
///
|
||||||
|
/// // Can get mutable access as guards dropped
|
||||||
|
/// assert!(data.get_mut().is_some());
|
||||||
|
///
|
||||||
|
/// // Can get another guard
|
||||||
|
/// assert!(data.try_freeze().is_some());
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// This is useful for a common 3-stage transaction where nothing should be able to
|
||||||
|
/// start another transaction on the locked resource whilst this transaction is
|
||||||
|
/// in-progress, but locks are not held for the entirety of the transaction:
|
||||||
|
///
|
||||||
|
/// 1. Exclusive access is obtained to a resource and work is identified
|
||||||
|
/// 2. Locks are dropped and some async work performed
|
||||||
|
/// 3. Locks are re-acquired and the result of async work committed
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use internal_types::freezable::Freezable;
|
||||||
|
/// use std::sync::RwLock;
|
||||||
|
///
|
||||||
|
/// let lockable = RwLock::new(Freezable::new(23));
|
||||||
|
///
|
||||||
|
/// // Start transaction
|
||||||
|
/// let handle = {
|
||||||
|
/// let mut locked = lockable.write().unwrap();
|
||||||
|
/// locked.try_freeze().expect("other transaction in progress")
|
||||||
|
/// };
|
||||||
|
///
|
||||||
|
/// // The contained data cannot be modified
|
||||||
|
/// assert!(lockable.write().unwrap().get_mut().is_none());
|
||||||
|
/// // But it can still be read
|
||||||
|
/// assert_eq!(**lockable.read().unwrap(), 23);
|
||||||
|
///
|
||||||
|
/// // --------------
|
||||||
|
/// // Do async work
|
||||||
|
/// // --------------
|
||||||
|
///
|
||||||
|
/// // Finish transaction
|
||||||
|
/// {
|
||||||
|
/// let mut locked = lockable.write().unwrap();
|
||||||
|
/// *locked.unfreeze(handle) = 45;
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Freezable<T>(Arc<T>);
|
||||||
|
|
||||||
|
impl<T> Freezable<T> {
|
||||||
|
pub fn new(payload: T) -> Self {
|
||||||
|
Self(Arc::new(payload))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a `FreezeHandle` that prevents modification
|
||||||
|
/// of the contents of `Freezable` until it is dropped
|
||||||
|
///
|
||||||
|
/// Returns None if the object is already frozen
|
||||||
|
pub fn try_freeze(&mut self) -> Option<FreezeHandle<T>> {
|
||||||
|
// Verify exclusive
|
||||||
|
self.get_mut()?;
|
||||||
|
Some(FreezeHandle(Arc::downgrade(&self.0)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unfreezes this instance, returning a mutable reference to
|
||||||
|
/// its contained data
|
||||||
|
pub fn unfreeze(&mut self, handle: FreezeHandle<T>) -> &mut T {
|
||||||
|
assert!(
|
||||||
|
std::ptr::eq(&*self.0, handle.0.as_ptr()),
|
||||||
|
"provided FreezeHandle is not for this instance"
|
||||||
|
);
|
||||||
|
std::mem::drop(handle);
|
||||||
|
// Just dropped `FreezeHandle` so should be valid
|
||||||
|
self.get_mut().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to get mutable access to the data
|
||||||
|
///
|
||||||
|
/// Returns `None` if this instance is frozen
|
||||||
|
pub fn get_mut(&mut self) -> Option<&mut T> {
|
||||||
|
Arc::get_mut(&mut self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for Freezable<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `FreezeHandle` is returned by `Freezable::try_freeze` and prevents modification
|
||||||
|
/// of the contents of `Freezable`. Importantly:
|
||||||
|
///
|
||||||
|
/// * `FreezeHandle` is `Send` and can be held across an await point
|
||||||
|
/// * `FreezeHandle` will unfreeze on `Drop` making it panic safe
|
||||||
|
/// * `FreezeHandle` is not coupled to the lifetime of the `Freezable`
|
||||||
|
///
|
||||||
|
/// The last of these is critical to allowing the `FreezeHandle`
|
||||||
|
/// to outlive the `&mut Freezable` from which it was created
|
||||||
|
///
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FreezeHandle<T>(Weak<T>);
|
|
@ -1,72 +0,0 @@
|
||||||
use std::ops::Deref;
|
|
||||||
use std::sync::{Arc, Weak};
|
|
||||||
|
|
||||||
/// A wrapper around a type T that allows acquiring `ReadGuard` that prevents
|
|
||||||
/// modification to the data within the `ReadLock` until the `ReadGuard` is dropped
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// use internal_types::guard::ReadLock;
|
|
||||||
/// let mut data = ReadLock::new(0);
|
|
||||||
/// assert!(data.get_mut().is_some());
|
|
||||||
///
|
|
||||||
/// *data.get_mut().unwrap() = 32;
|
|
||||||
/// assert_eq!(*data, 32);
|
|
||||||
///
|
|
||||||
/// let guard = data.lock();
|
|
||||||
/// assert!(data.get_mut().is_none());
|
|
||||||
///
|
|
||||||
/// let guard2 = data.lock();
|
|
||||||
/// std::mem::drop(guard);
|
|
||||||
///
|
|
||||||
/// assert!(data.get_mut().is_none());
|
|
||||||
/// std::mem::drop(guard2);
|
|
||||||
///
|
|
||||||
/// // Can get mutable access as guards dropped
|
|
||||||
/// assert!(data.get_mut().is_some());
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// The construction relies on the borrow checker to prevent creating
|
|
||||||
/// new `ReadGuard` whilst there is an active mutable borrow
|
|
||||||
///
|
|
||||||
/// ```compile_fail
|
|
||||||
/// use internal_types::guard::ReadLock;
|
|
||||||
/// let mut data = ReadLock::new(0);
|
|
||||||
/// let mut_borrow = data.get_mut();
|
|
||||||
///
|
|
||||||
/// data.lock(); // Shouldn't compile
|
|
||||||
///
|
|
||||||
/// std::mem::drop(mut_borrow);
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ReadLock<T>(Arc<T>);
|
|
||||||
|
|
||||||
impl<T> ReadLock<T> {
|
|
||||||
pub fn new(payload: T) -> Self {
|
|
||||||
Self(Arc::new(payload))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a new `ReadGuard` that prevents modification
|
|
||||||
/// of the contents of `ReadLock` until dropped
|
|
||||||
pub fn lock(&self) -> ReadGuard<T> {
|
|
||||||
ReadGuard(Arc::downgrade(&self.0))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to get access to the mutable data
|
|
||||||
///
|
|
||||||
/// Returns `None` if there are outstanding `ReadGuard`
|
|
||||||
pub fn get_mut(&mut self) -> Option<&mut T> {
|
|
||||||
Arc::get_mut(&mut self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Deref for ReadLock<T> {
|
|
||||||
type Target = T;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ReadGuard<T>(Weak<T>);
|
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
pub mod access;
|
pub mod access;
|
||||||
pub mod atomic_instant;
|
pub mod atomic_instant;
|
||||||
pub mod guard;
|
pub mod freezable;
|
||||||
pub mod once;
|
pub mod once;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod selection;
|
pub mod selection;
|
||||||
|
|
|
@ -9,7 +9,7 @@ use chrono::{DateTime, TimeZone, Utc};
|
||||||
|
|
||||||
use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary};
|
use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary};
|
||||||
use entry::Sequence;
|
use entry::Sequence;
|
||||||
use internal_types::guard::{ReadGuard, ReadLock};
|
use internal_types::freezable::{Freezable, FreezeHandle};
|
||||||
|
|
||||||
use crate::min_max_sequence::MinMaxSequence;
|
use crate::min_max_sequence::MinMaxSequence;
|
||||||
use crate::{checkpoint::PartitionCheckpoint, min_max_sequence::OptionalMinMaxSequence};
|
use crate::{checkpoint::PartitionCheckpoint, min_max_sequence::OptionalMinMaxSequence};
|
||||||
|
@ -37,7 +37,7 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
|
||||||
/// that arrived more than late_arrival_period seconds ago, as determined by wall clock time"
|
/// that arrived more than late_arrival_period seconds ago, as determined by wall clock time"
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PersistenceWindows {
|
pub struct PersistenceWindows {
|
||||||
persistable: ReadLock<Option<Window>>,
|
persistable: Freezable<Option<Window>>,
|
||||||
closed: VecDeque<Window>,
|
closed: VecDeque<Window>,
|
||||||
open: Option<Window>,
|
open: Option<Window>,
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ pub struct PersistenceWindows {
|
||||||
///
|
///
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FlushHandle {
|
pub struct FlushHandle {
|
||||||
guard: ReadGuard<Option<Window>>,
|
handle: FreezeHandle<Option<Window>>,
|
||||||
/// The number of closed windows at the time of the handle's creation
|
/// The number of closed windows at the time of the handle's creation
|
||||||
///
|
///
|
||||||
/// This identifies the windows that can have their
|
/// This identifies the windows that can have their
|
||||||
|
@ -113,7 +113,7 @@ impl PersistenceWindows {
|
||||||
let created_at_instant = Instant::now();
|
let created_at_instant = Instant::now();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
persistable: ReadLock::new(None),
|
persistable: Freezable::new(None),
|
||||||
closed: VecDeque::with_capacity(closed_window_count as usize),
|
closed: VecDeque::with_capacity(closed_window_count as usize),
|
||||||
open: None,
|
open: None,
|
||||||
addr,
|
addr,
|
||||||
|
@ -270,7 +270,7 @@ impl PersistenceWindows {
|
||||||
///
|
///
|
||||||
/// Returns `None` if there is an outstanding handle
|
/// Returns `None` if there is an outstanding handle
|
||||||
pub fn flush_handle(&mut self, now: Instant) -> Option<FlushHandle> {
|
pub fn flush_handle(&mut self, now: Instant) -> Option<FlushHandle> {
|
||||||
// Verify no active flush handles
|
// Verify no active flush handles before closing open window
|
||||||
self.persistable.get_mut()?;
|
self.persistable.get_mut()?;
|
||||||
|
|
||||||
// Close current open window if any
|
// Close current open window if any
|
||||||
|
@ -282,7 +282,7 @@ impl PersistenceWindows {
|
||||||
self.rotate(now);
|
self.rotate(now);
|
||||||
|
|
||||||
Some(FlushHandle {
|
Some(FlushHandle {
|
||||||
guard: self.persistable.lock(),
|
handle: self.persistable.try_freeze()?,
|
||||||
closed_count: self.closed.len(),
|
closed_count: self.closed.len(),
|
||||||
addr: self.addr.clone(),
|
addr: self.addr.clone(),
|
||||||
timestamp: self.persistable.as_ref()?.max_time,
|
timestamp: self.persistable.as_ref()?.max_time,
|
||||||
|
@ -294,7 +294,6 @@ impl PersistenceWindows {
|
||||||
pub fn flush(&mut self, handle: FlushHandle) {
|
pub fn flush(&mut self, handle: FlushHandle) {
|
||||||
let closed_count = handle.closed_count;
|
let closed_count = handle.closed_count;
|
||||||
let timestamp = handle.timestamp;
|
let timestamp = handle.timestamp;
|
||||||
std::mem::drop(handle);
|
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
self.closed.len() >= closed_count,
|
self.closed.len() >= closed_count,
|
||||||
|
@ -303,8 +302,7 @@ impl PersistenceWindows {
|
||||||
|
|
||||||
let persistable = self
|
let persistable = self
|
||||||
.persistable
|
.persistable
|
||||||
.get_mut()
|
.unfreeze(handle.handle)
|
||||||
.expect("expected no active locks")
|
|
||||||
.take()
|
.take()
|
||||||
.expect("expected persistable window");
|
.expect("expected persistable window");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue