feat: add flush guard to PersistenceWindows (#1883)

* feat: add flush guard to PersistenceWindows

* docs: Update comments based on code review

* fix: fmt

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
pull/24376/head
Raphael Taylor-Davies 2021-07-02 21:15:33 +01:00 committed by GitHub
parent b4534883fe
commit 5fe49aa017
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 338 additions and 44 deletions

View File

@ -0,0 +1,72 @@
use std::ops::Deref;
use std::sync::{Arc, Weak};
/// A wrapper around a type T that allows acquiring `ReadGuard` that prevents
/// modification to the data within the `ReadLock` until the `ReadGuard` is dropped
///
/// ```
/// use internal_types::guard::ReadLock;
/// let mut data = ReadLock::new(0);
/// assert!(data.get_mut().is_some());
///
/// *data.get_mut().unwrap() = 32;
/// assert_eq!(*data, 32);
///
/// let guard = data.lock();
/// assert!(data.get_mut().is_none());
///
/// let guard2 = data.lock();
/// std::mem::drop(guard);
///
/// assert!(data.get_mut().is_none());
/// std::mem::drop(guard2);
///
/// // Can get mutable access as guards dropped
/// assert!(data.get_mut().is_some());
/// ```
///
/// The construction relies on the borrow checker to prevent creating
/// new `ReadGuard` whilst there is an active mutable borrow
///
/// ```compile_fail
/// use internal_types::guard::ReadLock;
/// let mut data = ReadLock::new(0);
/// let mut_borrow = data.get_mut();
///
/// data.lock(); // Shouldn't compile
///
/// std::mem::drop(mut_borrow);
/// ```
///
#[derive(Debug)]
pub struct ReadLock<T>(Arc<T>);
impl<T> ReadLock<T> {
pub fn new(payload: T) -> Self {
Self(Arc::new(payload))
}
/// Returns a new `ReadGuard` that prevents modification
/// of the contents of `ReadLock` until dropped
pub fn lock(&self) -> ReadGuard<T> {
ReadGuard(Arc::downgrade(&self.0))
}
/// Try to get access to the mutable data
///
/// Returns `None` if there are outstanding `ReadGuard`
pub fn get_mut(&mut self) -> Option<&mut T> {
Arc::get_mut(&mut self.0)
}
}
impl<T> Deref for ReadLock<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub struct ReadGuard<T>(Weak<T>);

View File

@ -6,6 +6,7 @@
clippy::clone_on_ref_ptr
)]
pub mod guard;
pub mod once;
pub mod schema;
pub mod selection;

View File

@ -1,12 +1,13 @@
//! In memory structures for tracking data ingest and when persistence can or should occur.
use entry::Sequence;
use std::{
collections::{BTreeMap, VecDeque},
time::{Duration, Instant},
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use entry::Sequence;
use internal_types::guard::{ReadGuard, ReadLock};
const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
@ -17,15 +18,27 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
/// The sequencer_id in the code below will map to a Kafka partition. The sequence_number maps
/// to a Kafka offset. Because IOx will run without Kafka, we use the more generic terms rather
/// than the Kafka terminology.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistenceWindows {
persistable: Option<Window>,
persistable: ReadLock<Option<Window>>,
closed: VecDeque<Window>,
open: Option<Window>,
late_arrival_period: Duration,
closed_window_period: Duration,
}
/// A handle for flushing data from the `PersistenceWindows`
/// while preventing additional modification to the `persistable` list
#[derive(Debug)]
pub struct FlushHandle {
guard: ReadGuard<Option<Window>>,
/// The number of closed windows at the time of the handle's creation
///
/// This identifies the windows that can have their
/// minimum timestamps truncated on flush
closed_count: usize,
}
impl PersistenceWindows {
pub fn new(late_arrival_period: Duration) -> Self {
let closed_window_period = late_arrival_period.min(DEFAULT_CLOSED_WINDOW_PERIOD);
@ -36,7 +49,7 @@ impl PersistenceWindows {
let closed_window_count = late_arrival_seconds / closed_window_seconds;
Self {
persistable: None,
persistable: ReadLock::new(None),
closed: VecDeque::with_capacity(closed_window_count as usize),
open: None,
late_arrival_period,
@ -112,9 +125,7 @@ impl PersistenceWindows {
self.persistable.as_ref().map(|w| w.max_time)
}
/// rotates open window to closed if past time and any closed windows to persistable. The lifecycle manager
/// should clone all persistence windows and call this method before checking on persistable_age
/// to see if the time threshold has been crossed.
/// rotates open window to closed if past time and any closed windows to persistable.
pub fn rotate(&mut self, now: Instant) {
let rotate = self
.open
@ -126,38 +137,65 @@ impl PersistenceWindows {
self.closed.push_back(self.open.take().unwrap())
}
while let Some(w) = self.closed.pop_front() {
if now.duration_since(w.created_at) >= self.late_arrival_period {
match self.persistable.as_mut() {
Some(persistable_window) => persistable_window.add_window(w),
None => self.persistable = Some(w),
// if there is no ongoing persistence operation, try and
// add closed windows to the `perstable` list
if let Some(persistable) = self.persistable.get_mut() {
while let Some(w) = self.closed.pop_front() {
if now.duration_since(w.created_at) >= self.late_arrival_period {
match persistable.as_mut() {
Some(persistable_window) => persistable_window.add_window(w),
None => *persistable = Some(w),
}
} else {
self.closed.push_front(w);
break;
}
} else {
self.closed.push_front(w);
break;
}
}
}
/// Clears out the persistable window and sets the min time of any closed and open windows
/// to the greater of either their min time or the end time of the persistable window (known
/// as the max_persistable_timestamp value).
pub fn flush(&mut self) {
if let Some(t) = self.max_persistable_timestamp() {
for w in &mut self.closed {
if w.min_time < t {
w.min_time = t;
/// Acquire a handle that prevents mutation of the persistable window until dropped
///
/// Returns `None` if there is an outstanding handle
pub fn flush_handle(&mut self) -> Option<FlushHandle> {
// Verify no active flush handles
self.persistable.get_mut()?;
Some(FlushHandle {
guard: self.persistable.lock(),
closed_count: self.closed.len(),
})
}
/// Clears out the persistable window
pub fn flush(&mut self, handle: FlushHandle) {
let closed_count = handle.closed_count;
std::mem::drop(handle);
assert!(
self.closed.len() >= closed_count,
"windows dropped from closed whilst locked"
);
let persistable = self
.persistable
.get_mut()
.expect("expected no active locks");
if let Some(persistable) = persistable {
// Everything up to and including persistable max time will have been persisted
let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1);
for w in self.closed.iter_mut().take(closed_count) {
if w.min_time < new_min {
w.min_time = new_min;
if w.max_time < new_min {
w.max_time = new_min;
w.row_count = 0;
}
}
}
if let Some(w) = self.open.as_mut() {
if w.min_time < t {
w.min_time = t;
}
};
}
self.persistable = None;
*persistable = None;
}
/// Returns the unpersisted sequencer numbers that represent the min
@ -600,7 +638,8 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
w.flush();
let handle = w.flush_handle().unwrap();
w.flush(handle);
assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
@ -670,6 +709,9 @@ mod tests {
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
@ -678,7 +720,8 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
w.flush();
let flush = w.flush_handle().unwrap();
w.flush(flush);
assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
@ -686,7 +729,7 @@ mod tests {
// the first closed window should have a min time equal to the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.min_time, max_time);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
@ -746,6 +789,9 @@ mod tests {
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
@ -754,7 +800,8 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
w.flush();
let flush = w.flush_handle().unwrap();
w.flush(flush);
assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
@ -762,14 +809,14 @@ mod tests {
// the closed window should have a min time equal to the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.min_time, first_end);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
// the open window should have a min time equal to max_time
// the open window should not have been modified by the flush
let c = w.open.as_ref().unwrap();
assert_eq!(c.row_count, 2);
assert_eq!(c.min_time, max_time);
assert_eq!(c.min_time, third_start);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
}
@ -825,6 +872,9 @@ mod tests {
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
@ -834,9 +884,10 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
// after flush we should see no more persistable window and the closed and open windows
// after flush we should see no more persistable window and the closed windows
// should have min timestamps equal to the previous flush end.
w.flush();
let flush = w.flush_handle().unwrap();
w.flush(flush);
assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
@ -844,15 +895,185 @@ mod tests {
// the closed window should have a min time equal to the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.min_time, max_time);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
// the open window should have a min time equal to max_time
// the open window should not have been modified by the flush
let c = w.open.as_ref().unwrap();
assert_eq!(c.row_count, 2);
assert_eq!(c.min_time, max_time);
assert_eq!(c.min_time, third_start);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
}
#[test]
fn test_flush_guard() {
let mut w = PersistenceWindows::new(Duration::from_secs(120));
let instant = Instant::now();
let start = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
start,
start + chrono::Duration::seconds(2),
instant,
);
w.rotate(instant + Duration::from_secs(120));
assert!(w.persistable.is_some());
assert_eq!(w.persistable_row_count(), 2);
assert_eq!(
w.max_persistable_timestamp().unwrap(),
start + chrono::Duration::seconds(2)
);
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
5,
start,
start + chrono::Duration::seconds(4),
instant + Duration::from_secs(120),
);
// Should rotate into closed
w.rotate(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD);
assert_eq!(w.closed.len(), 1);
let guard = w.flush_handle().unwrap();
// Should only allow one at once
assert!(w.flush_handle().is_none());
// This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable_row_count(), 2);
// Flush persistable window
w.flush(guard);
assert_eq!(w.persistable_row_count(), 0);
// This should rotate into persistable
w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable_row_count(), 5);
// Min time should have been truncated by persist operation to be
// 3 nanosecond more than was persisted
let truncated_time =
start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1);
assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time);
let guard = w.flush_handle().unwrap();
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
9,
start,
start + chrono::Duration::seconds(2),
instant + Duration::from_secs(240),
);
// Should rotate into closed
w.rotate(instant + Duration::from_secs(240) + DEFAULT_CLOSED_WINDOW_PERIOD);
assert_eq!(w.closed.len(), 1);
// This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable_row_count(), 5);
std::mem::drop(guard);
// This should rotate into persistable
w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable_row_count(), 5 + 9);
assert_eq!(w.persistable.as_ref().unwrap().min_time, start);
}
#[test]
fn test_flush_guard_multiple_closed() {
let mut w = PersistenceWindows::new(DEFAULT_CLOSED_WINDOW_PERIOD * 3);
let instant = Instant::now();
let start = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
start,
start + chrono::Duration::seconds(2),
instant,
);
w.add_range(
Some(&Sequence { id: 1, number: 6 }),
5,
start,
start + chrono::Duration::seconds(4),
instant + DEFAULT_CLOSED_WINDOW_PERIOD,
);
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
9,
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2,
);
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
17,
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3,
);
assert_eq!(w.closed.len(), 2);
assert_eq!(w.closed[0].row_count, 5);
assert_eq!(w.closed[1].row_count, 9);
assert_eq!(w.open.as_ref().unwrap().row_count, 17);
let flush = w.flush_handle().unwrap();
assert_eq!(w.persistable_row_count(), 2);
assert_eq!(flush.closed_count, 2);
w.add_range(
Some(&Sequence { id: 1, number: 14 }),
11,
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4,
);
w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5);
// Despite time passing persistable window shouldn't have changed due to flush guard
assert_eq!(w.persistable_row_count(), 2);
assert_eq!(w.closed.len(), 4);
w.flush(flush);
let flush_time = start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1);
assert!(w.persistable.is_none());
assert_eq!(w.closed.len(), 4);
assert_eq!(w.closed[0].min_time, flush_time);
assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4));
assert_eq!(w.closed[0].row_count, 5);
assert_eq!(w.closed[1].min_time, flush_time);
assert_eq!(w.closed[1].max_time, flush_time);
assert_eq!(w.closed[1].row_count, 0); // Entirely flushed window
// Window closed after flush handle - should be left alone
assert_eq!(w.closed[2].min_time, start);
assert_eq!(w.closed[2].max_time, start + chrono::Duration::seconds(2));
assert_eq!(w.closed[2].row_count, 17); // Entirely flushed window
// Window created after flush handle - should be left alone
assert_eq!(w.closed[3].min_time, start);
assert_eq!(w.closed[3].max_time, start + chrono::Duration::seconds(2));
assert_eq!(w.closed[3].row_count, 11);
}
}