feat: make persistence windows interface harder to use incorrectly (#1977)

* feat: make persistence windows interface harder to use incorrectly

* chore: review feedback

* chore: update comment

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-14 14:03:18 +01:00 committed by GitHub
parent 8ac22dda4c
commit f1c1620c84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 241 additions and 173 deletions

View File

@ -45,7 +45,7 @@ pub trait LifecycleDb {
pub trait LockablePartition: Sized + std::fmt::Display { pub trait LockablePartition: Sized + std::fmt::Display {
type Partition: LifecyclePartition; type Partition: LifecyclePartition;
type Chunk: LockableChunk; type Chunk: LockableChunk;
type PersistHandle: Send + Sync + 'static; type PersistHandle: PersistHandle + Send + Sync + 'static;
type Error: std::error::Error + Send + Sync; type Error: std::error::Error + Send + Sync;
@ -78,10 +78,12 @@ pub trait LockablePartition: Sized + std::fmt::Display {
/// Returns None if there is a persistence operation in flight, or /// Returns None if there is a persistence operation in flight, or
/// if there are no persistable windows. /// if there are no persistable windows.
/// ///
/// TODO: This interface is nasty /// `now` is the wall clock time that should be used to compute how long a given
/// write has been present in memory
fn prepare_persist( fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
) -> Option<(Self::PersistHandle, DateTime<Utc>)>; now: Instant,
) -> Option<Self::PersistHandle>;
/// Split and persist chunks. /// Split and persist chunks.
/// ///
@ -97,7 +99,6 @@ pub trait LockablePartition: Sized + std::fmt::Display {
fn persist_chunks( fn persist_chunks(
partition: LifecycleWriteGuard<'_, Self::Partition, Self>, partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>>,
max_persistable_timestamp: DateTime<Utc>,
handle: Self::PersistHandle, handle: Self::PersistHandle,
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>; ) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
@ -162,7 +163,10 @@ pub trait LifecyclePartition {
fn is_persisted(&self) -> bool; fn is_persisted(&self) -> bool;
/// Returns an approximation of the number of rows that can be persisted /// Returns an approximation of the number of rows that can be persisted
fn persistable_row_count(&self) -> usize; ///
/// `now` is the wall clock time that should be used to compute how long a given
/// write has been present in memory
fn persistable_row_count(&self, now: Instant) -> usize;
/// Returns the age of the oldest unpersisted write /// Returns the age of the oldest unpersisted write
fn minimum_unpersisted_age(&self) -> Option<Instant>; fn minimum_unpersisted_age(&self) -> Option<Instant>;
@ -187,3 +191,10 @@ pub trait LifecycleChunk {
fn row_count(&self) -> usize; fn row_count(&self) -> usize;
} }
/// The trait for a persist handle
pub trait PersistHandle {
/// Any unpersisted chunks containing rows with timestamps less than or equal to this
/// must be included in the corresponding `LockablePartition::persist_chunks` call
fn timestamp(&self) -> DateTime<Utc>;
}

View File

@ -11,7 +11,10 @@ use data_types::database_rules::{LifecycleRules, DEFAULT_MUB_ROW_THRESHOLD};
use observability_deps::tracing::{debug, info, trace, warn}; use observability_deps::tracing::{debug, info, trace, warn};
use tracker::TaskTracker; use tracker::TaskTracker;
use crate::{LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition}; use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle,
};
/// Number of seconds to wait before retying a failed lifecycle action /// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
@ -321,14 +324,14 @@ where
}) })
.unwrap_or_default() as u32; .unwrap_or_default() as u32;
let persistable_row_count = partition.persistable_row_count(now);
debug!(%db_name, %partition, debug!(%db_name, %partition,
partition_persist_row_count=partition.persistable_row_count(), partition_persist_row_count=persistable_row_count,
rules_persist_row_count=%rules.persist_row_threshold.get(), rules_persist_row_count=%rules.persist_row_threshold.get(),
partition_persistable_age_seconds=persistable_age_seconds, partition_persistable_age_seconds=persistable_age_seconds,
rules_persist_age_threshold_seconds=%rules.persist_age_threshold_seconds.get(), rules_persist_age_threshold_seconds=%rules.persist_age_threshold_seconds.get(),
"considering for persistence"); "considering for persistence");
let persistable_row_count = partition.persistable_row_count();
if persistable_row_count >= rules.persist_row_threshold.get() { if persistable_row_count >= rules.persist_row_threshold.get() {
info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold"); info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold");
} else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() { } else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() {
@ -343,9 +346,7 @@ where
// Upgrade partition to be able to rotate persistence windows // Upgrade partition to be able to rotate persistence windows
let mut partition = partition.upgrade(); let mut partition = partition.upgrade();
let (persist_handle, max_persistable_timestamp) = match LockablePartition::prepare_persist( let persist_handle = match LockablePartition::prepare_persist(&mut partition, now) {
&mut partition,
) {
Some(x) => x, Some(x) => x,
None => { None => {
debug!(%db_name, %partition, "no persistable windows or previous outstanding persist"); debug!(%db_name, %partition, "no persistable windows or previous outstanding persist");
@ -375,7 +376,7 @@ where
// Chunk's data is entirely after the time we are flushing // Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the // up to, and thus there is reason to include it in the
// plan // plan
if chunk.min_timestamp() > max_persistable_timestamp { if chunk.min_timestamp() > persist_handle.timestamp() {
// Can safely ignore chunk // Can safely ignore chunk
debug!(%db_name, %partition, chunk=%chunk.addr(), debug!(%db_name, %partition, chunk=%chunk.addr(),
"chunk does not contain data eligible for persistence"); "chunk does not contain data eligible for persistence");
@ -404,14 +405,9 @@ where
.map(|chunk| chunk.upgrade()) .map(|chunk| chunk.upgrade())
.collect(); .collect();
let tracker = LockablePartition::persist_chunks( let tracker = LockablePartition::persist_chunks(partition, chunks, persist_handle)
partition, .expect("failed to persist chunks")
chunks, .with_metadata(ChunkLifecycleAction::Persisting);
max_persistable_timestamp,
persist_handle,
)
.expect("failed to persist chunks")
.with_metadata(ChunkLifecycleAction::Persisting);
self.trackers.push(tracker); self.trackers.push(tracker);
false false
@ -620,7 +616,7 @@ mod tests {
use crate::{ use crate::{
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, LockablePartition, PersistHandle,
}; };
use super::*; use super::*;
@ -750,10 +746,21 @@ mod tests {
chunk: Arc<RwLock<TestChunk>>, chunk: Arc<RwLock<TestChunk>>,
} }
#[derive(Debug)]
struct TestPersistHandle {
timestamp: DateTime<Utc>,
}
impl PersistHandle for TestPersistHandle {
fn timestamp(&self) -> DateTime<Utc> {
self.timestamp
}
}
impl<'a> LockablePartition for TestLockablePartition<'a> { impl<'a> LockablePartition for TestLockablePartition<'a> {
type Partition = TestPartition; type Partition = TestPartition;
type Chunk = TestLockableChunk<'a>; type Chunk = TestLockableChunk<'a>;
type PersistHandle = (); type PersistHandle = TestPersistHandle;
type Error = Infallible; type Error = Infallible;
fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> {
@ -818,17 +825,18 @@ mod tests {
fn prepare_persist( fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
) -> Option<(Self::PersistHandle, DateTime<Utc>)> { _now: Instant,
Some(((), partition.max_persistable_timestamp.unwrap())) ) -> Option<Self::PersistHandle> {
Some(TestPersistHandle {
timestamp: partition.max_persistable_timestamp.unwrap(),
})
} }
fn persist_chunks( fn persist_chunks(
mut partition: LifecycleWriteGuard<'_, TestPartition, Self>, mut partition: LifecycleWriteGuard<'_, TestPartition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>,
_max_persistable_timestamp: DateTime<Utc>, handle: Self::PersistHandle,
_handle: Self::PersistHandle,
) -> Result<TaskTracker<()>, Self::Error> { ) -> Result<TaskTracker<()>, Self::Error> {
let flush_timestamp = partition.max_persistable_timestamp.unwrap();
for chunk in &chunks { for chunk in &chunks {
partition.chunks.remove(&chunk.addr.chunk_id); partition.chunks.remove(&chunk.addr.chunk_id);
} }
@ -838,7 +846,7 @@ mod tests {
// The remainder left behind after the split // The remainder left behind after the split
let new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer) let new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer)
.with_min_timestamp(flush_timestamp); .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1));
partition partition
.chunks .chunks
@ -910,7 +918,7 @@ mod tests {
false false
} }
fn persistable_row_count(&self) -> usize { fn persistable_row_count(&self, _now: Instant) -> usize {
self.persistable_row_count self.persistable_row_count
} }

View File

@ -17,9 +17,20 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
/// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while /// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while
/// persisting mostly in non-time overlapping Parquet files. /// persisting mostly in non-time overlapping Parquet files.
/// ///
/// The sequencer_id in the code below will map to a Kafka partition. The sequence_number maps /// The sequencer_id in the code below will map to a Kafka partition id. The sequence_number maps
/// to a Kafka offset. Because IOx will run without Kafka, we use the more generic terms rather /// to a Kafka offset. Because IOx will run without Kafka, we use the more generic terms rather
/// than the Kafka terminology. /// than the Kafka terminology.
///
/// The `PersistenceWindows` operate on two different types of time
///
/// * row timestamps - these are `DateTime<Utc>` and are the row's value for the `time` column
/// * Wall timestamps - these are `Instant` and are the Wall clock of the system used to determine
/// the "age" of a set of writes within a PersistenceWindow
///
/// To aid testing Wall timestamps are passed to many methods instead of directly using `Instant::now`
///
/// The PersistenceWindows answer the question: - "What is the maximum row timestamp in the writes
/// that arrived more than late_arrival_period seconds ago, as determined by wall clock time"
#[derive(Debug)] #[derive(Debug)]
pub struct PersistenceWindows { pub struct PersistenceWindows {
persistable: ReadLock<Option<Window>>, persistable: ReadLock<Option<Window>>,
@ -28,7 +39,7 @@ pub struct PersistenceWindows {
late_arrival_period: Duration, late_arrival_period: Duration,
closed_window_period: Duration, closed_window_period: Duration,
/// The last last instant passed to PersistenceWindows::add_range /// The last instant passed to PersistenceWindows::add_range
last_instant: Instant, last_instant: Instant,
/// maps sequencer_id to the maximum sequence passed to PersistenceWindows::add_range /// maps sequencer_id to the maximum sequence passed to PersistenceWindows::add_range
@ -36,7 +47,13 @@ pub struct PersistenceWindows {
} }
/// A handle for flushing data from the `PersistenceWindows` /// A handle for flushing data from the `PersistenceWindows`
/// while preventing additional modification to the `persistable` list ///
/// When a `FlushHandle` is created it computes the row timestamp that should be persisted up to
///
/// It then allows flushing the corresponding writes from the `PersistenceWindows` that were
/// present at the time the `FlushHandle` was created. Even if later writes have been recorded
/// in the `PersistenceWindows` in the intervening time
///
#[derive(Debug)] #[derive(Debug)]
pub struct FlushHandle { pub struct FlushHandle {
guard: ReadGuard<Option<Window>>, guard: ReadGuard<Option<Window>>,
@ -45,6 +62,16 @@ pub struct FlushHandle {
/// This identifies the windows that can have their /// This identifies the windows that can have their
/// minimum timestamps truncated on flush /// minimum timestamps truncated on flush
closed_count: usize, closed_count: usize,
/// The timestamp to flush
timestamp: DateTime<Utc>,
}
impl FlushHandle {
/// Should flush all rows with a timestamp less than or equal to this
pub fn timestamp(&self) -> DateTime<Utc> {
self.timestamp
}
} }
impl PersistenceWindows { impl PersistenceWindows {
@ -127,59 +154,37 @@ impl PersistenceWindows {
}; };
} }
/// Returns the min time of the open persistence window, if any
pub fn open_min_time(&self) -> Option<DateTime<Utc>> {
self.open.as_ref().map(|open| open.min_time)
}
/// Returns the max time of the open persistence window, if any
pub fn open_max_time(&self) -> Option<DateTime<Utc>> {
self.open.as_ref().map(|open| open.max_time)
}
/// Returns the number of rows that are persistable. These rows could be duplicates and there
/// are other rows that may fall in closed and open that would be pulled into a persistence
/// operation. This number is used to determine if persistence should be triggered, not as
/// an exact number.
pub fn persistable_row_count(&self) -> usize {
self.persistable.as_ref().map(|w| w.row_count).unwrap_or(0)
}
/// Returns the instant of the oldest persistable data
pub fn persistable_age(&self) -> Option<Instant> {
self.persistable.as_ref().map(|w| w.created_at)
}
/// Returns the max timestamp of data in the persistable window. Any unpersisted data with a
/// timestamp <= than this value can be persisted.
pub fn max_persistable_timestamp(&self) -> Option<DateTime<Utc>> {
self.persistable.as_ref().map(|w| w.max_time)
}
/// rotates open window to closed if past time and any closed windows to persistable. /// rotates open window to closed if past time and any closed windows to persistable.
pub fn rotate(&mut self, now: Instant) { ///
/// `now` is the Wall clock time of the server to use for determining how "old" a given
/// persistence window is, or in other words, how long since the writes it contains the
/// metrics for were written to this partition
fn rotate(&mut self, now: Instant) {
let rotate = self let rotate = self
.open .open
.as_ref() .as_ref()
.map(|w| now.duration_since(w.created_at) >= self.closed_window_period) .map(|w| w.is_closeable(now, self.closed_window_period))
.unwrap_or(false); .unwrap_or(false);
if rotate { if rotate {
self.closed.push_back(self.open.take().unwrap()) self.closed.push_back(self.open.take().unwrap())
} }
let late_arrival_period = self.late_arrival_period;
// if there is no ongoing persistence operation, try and // if there is no ongoing persistence operation, try and
// add closed windows to the `perstable` list // add closed windows to the `persistable` window
if let Some(persistable) = self.persistable.get_mut() { if let Some(persistable) = self.persistable.get_mut() {
while let Some(w) = self.closed.pop_front() { while self
if now.duration_since(w.created_at) >= self.late_arrival_period { .closed
match persistable.as_mut() { .front()
Some(persistable_window) => persistable_window.add_window(w), .map(|w| w.is_persistable(now, late_arrival_period))
None => *persistable = Some(w), .unwrap_or(false)
} {
} else { let w = self.closed.pop_front().unwrap();
self.closed.push_front(w); match persistable.as_mut() {
break; Some(persistable_window) => persistable_window.add_window(w),
None => *persistable = Some(w),
} }
} }
} }
@ -188,18 +193,24 @@ impl PersistenceWindows {
/// Acquire a handle that prevents mutation of the persistable window until dropped /// Acquire a handle that prevents mutation of the persistable window until dropped
/// ///
/// Returns `None` if there is an outstanding handle /// Returns `None` if there is an outstanding handle
pub fn flush_handle(&mut self) -> Option<FlushHandle> { pub fn flush_handle(&mut self, now: Instant) -> Option<FlushHandle> {
// Verify no active flush handles // Verify no active flush handles
self.persistable.get_mut()?; self.persistable.get_mut()?;
// Rotate into persistable window
self.rotate(now);
Some(FlushHandle { Some(FlushHandle {
guard: self.persistable.lock(), guard: self.persistable.lock(),
closed_count: self.closed.len(), closed_count: self.closed.len(),
timestamp: self.persistable.as_ref()?.max_time,
}) })
} }
/// Clears out the persistable window /// Clears out the persistable window
pub fn flush(&mut self, handle: FlushHandle) { pub fn flush(&mut self, handle: FlushHandle) {
let closed_count = handle.closed_count; let closed_count = handle.closed_count;
let timestamp = handle.timestamp;
std::mem::drop(handle); std::mem::drop(handle);
assert!( assert!(
@ -210,40 +221,40 @@ impl PersistenceWindows {
let persistable = self let persistable = self
.persistable .persistable
.get_mut() .get_mut()
.expect("expected no active locks"); .expect("expected no active locks")
.take()
.expect("expected persistable window");
if let Some(persistable) = persistable { assert_eq!(
// Everything up to and including persistable max time will have been persisted persistable.max_time, timestamp,
let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1); "persistable max time doesn't match handle"
for w in self.closed.iter_mut().take(closed_count) { );
if w.min_time < new_min {
w.min_time = new_min; // Everything up to and including persistable max time will have been persisted
if w.max_time < new_min { let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1);
w.max_time = new_min; for w in self.closed.iter_mut().take(closed_count) {
w.row_count = 0; if w.min_time < new_min {
} w.min_time = new_min;
if w.max_time < new_min {
w.max_time = new_min;
w.row_count = 0;
} }
} }
} }
}
*persistable = None; /// Returns an iterator over the windows starting with the oldest
fn windows(&self) -> impl Iterator<Item = &Window> {
self.persistable
.as_ref()
.into_iter()
.chain(self.closed.iter())
.chain(self.open.as_ref().into_iter())
} }
/// Returns the minimum window /// Returns the minimum window
fn minimum_window(&self) -> Option<&Window> { fn minimum_window(&self) -> Option<&Window> {
if let Some(w) = self.persistable.as_ref() { self.windows().next()
return Some(w);
}
if let Some(w) = self.closed.front() {
return Some(w);
}
if let Some(w) = self.open.as_ref() {
return Some(w);
}
None
} }
/// Returns the unpersisted sequencer numbers that represent the min /// Returns the unpersisted sequencer numbers that represent the min
@ -255,6 +266,24 @@ impl PersistenceWindows {
pub fn minimum_unpersisted_age(&self) -> Option<Instant> { pub fn minimum_unpersisted_age(&self) -> Option<Instant> {
self.minimum_window().map(|x| x.created_at) self.minimum_window().map(|x| x.created_at)
} }
/// Returns the minimum unpersisted timestamp
pub fn minimum_unpersisted_timestamp(&self) -> Option<DateTime<Utc>> {
self.windows().map(|x| x.min_time).min()
}
/// Returns the maximum unpersisted timestamp
pub fn maximum_unpersisted_timestamp(&self) -> Option<DateTime<Utc>> {
self.windows().map(|x| x.max_time).max()
}
/// Returns the number of persistable rows
pub fn persistable_row_count(&self, now: Instant) -> usize {
self.windows()
.take_while(|window| window.is_persistable(now, self.late_arrival_period))
.map(|window| window.row_count)
.sum()
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -347,6 +376,20 @@ impl Window {
} }
} }
} }
/// If this window can be closed
fn is_closeable(&self, now: Instant, closed_window_period: Duration) -> bool {
now.checked_duration_since(self.created_at)
.map(|x| x >= closed_window_period)
.unwrap_or(false)
}
/// If this window is persistable
fn is_persistable(&self, now: Instant, late_arrival_period: Duration) -> bool {
now.checked_duration_since(self.created_at)
.map(|x| x >= late_arrival_period)
.unwrap_or(false)
}
} }
#[cfg(test)] #[cfg(test)]
@ -390,7 +433,6 @@ mod tests {
Instant::now(), Instant::now(),
); );
assert_eq!(w.persistable_row_count(), 0);
assert!(w.closed.is_empty()); assert!(w.closed.is_empty());
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
let open = w.open.unwrap(); let open = w.open.unwrap();
@ -441,7 +483,7 @@ mod tests {
after_close_threshold, after_close_threshold,
); );
assert_eq!(w.persistable_row_count(), 0); assert!(w.persistable.is_none());
let closed = w.closed.get(0).unwrap(); let closed = w.closed.get(0).unwrap();
assert_eq!( assert_eq!(
@ -501,7 +543,7 @@ mod tests {
third_created_at, third_created_at,
); );
assert_eq!(w.persistable_row_count(), 0); assert!(w.persistable.is_none());
// confirm the two on closed and third on open // confirm the two on closed and third on open
let c = w.closed.get(0).cloned().unwrap(); let c = w.closed.get(0).cloned().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
@ -533,9 +575,6 @@ mod tests {
fourth_created_at, fourth_created_at,
); );
assert_eq!(w.persistable_row_count(), 5);
assert_eq!(w.persistable_age(), Some(created_at));
// confirm persistable has first and second // confirm persistable has first and second
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
@ -561,9 +600,6 @@ mod tests {
fifth_created_at, fifth_created_at,
); );
assert_eq!(w.persistable_row_count(), 10);
assert_eq!(w.persistable_age(), Some(created_at));
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 10); assert_eq!(c.row_count, 10);
@ -621,8 +657,6 @@ mod tests {
w.rotate(end_at); w.rotate(end_at);
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2); assert_eq!(c.row_count, 2);
@ -631,8 +665,10 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
let handle = w.flush_handle().unwrap();
let handle = w.flush_handle(end_at).unwrap();
w.flush(handle); w.flush(handle);
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone(); let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
@ -700,11 +736,6 @@ mod tests {
w.rotate(end_at); w.rotate(end_at);
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2); assert_eq!(c.row_count, 2);
@ -713,16 +744,20 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
let flush = w.flush_handle().unwrap(); let flush = w.flush_handle(end_at).unwrap();
assert_eq!(flush.timestamp(), first_end);
let truncated_time = flush.timestamp() + chrono::Duration::nanoseconds(1);
w.flush(flush); w.flush(flush);
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone(); let mins = w.closed[0].sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
// the first closed window should have a min time equal to the flush // the first closed window should have a min time truncated by the flush
let c = &w.closed[0]; let c = &w.closed[0];
assert_eq!(c.row_count, 3); assert_eq!(c.row_count, 3);
assert_eq!(c.min_time, flushed_time); assert_eq!(c.min_time, truncated_time);
assert_eq!(c.max_time, second_end); assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at); assert_eq!(c.created_at, second_created_at);
@ -780,11 +815,6 @@ mod tests {
w.rotate(end_at); w.rotate(end_at);
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2); assert_eq!(c.row_count, 2);
@ -793,7 +823,11 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
let flush = w.flush_handle().unwrap();
let flush = w.flush_handle(end_at).unwrap();
assert_eq!(flush.timestamp(), first_end);
let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1);
w.flush(flush); w.flush(flush);
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone(); let mins = w.closed[0].sequencer_numbers.clone();
@ -860,14 +894,6 @@ mod tests {
third_created_at, third_created_at,
); );
// this should rotate the first window into persistable
w.rotate(end_at);
let max_time = w.max_persistable_timestamp().unwrap();
assert_eq!(max_time, first_end);
let flushed_time = max_time + chrono::Duration::nanoseconds(1);
let c = w.persistable.as_ref().unwrap(); let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at); assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2); assert_eq!(c.row_count, 2);
@ -877,9 +903,14 @@ mod tests {
let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone();
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
// this should rotate the first window into persistable
// after flush we should see no more persistable window and the closed windows // after flush we should see no more persistable window and the closed windows
// should have min timestamps equal to the previous flush end. // should have min timestamps equal to the previous flush end.
let flush = w.flush_handle().unwrap(); let flush = w.flush_handle(end_at).unwrap();
assert_eq!(flush.timestamp(), first_end);
let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1);
w.flush(flush); w.flush(flush);
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
let mins = w.closed[0].sequencer_numbers.clone(); let mins = w.closed[0].sequencer_numbers.clone();
@ -917,9 +948,9 @@ mod tests {
w.rotate(instant + Duration::from_secs(120)); w.rotate(instant + Duration::from_secs(120));
assert!(w.persistable.is_some()); assert!(w.persistable.is_some());
assert_eq!(w.persistable_row_count(), 2); assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!( assert_eq!(
w.max_persistable_timestamp().unwrap(), w.persistable.as_ref().unwrap().max_time,
start + chrono::Duration::seconds(2) start + chrono::Duration::seconds(2)
); );
@ -935,30 +966,34 @@ mod tests {
w.rotate(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD); w.rotate(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD);
assert_eq!(w.closed.len(), 1); assert_eq!(w.closed.len(), 1);
let guard = w.flush_handle().unwrap(); let guard = w
.flush_handle(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD)
.unwrap();
// Should only allow one at once // Should only allow one at once
assert!(w.flush_handle().is_none()); assert!(w.flush_handle(instant).is_none());
// This should not rotate into persistable as active flush guard // This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(240)); w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable_row_count(), 2); assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
let flush_t = guard.timestamp();
assert_eq!(flush_t, start + chrono::Duration::seconds(2));
// Flush persistable window // Flush persistable window
w.flush(guard); w.flush(guard);
assert_eq!(w.persistable_row_count(), 0); assert!(w.persistable.is_none());
// This should rotate into persistable // This should rotate into persistable
w.rotate(instant + Duration::from_secs(240)); w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable_row_count(), 5); assert_eq!(w.persistable.as_ref().unwrap().row_count, 5);
// Min time should have been truncated by persist operation to be // Min time should have been truncated by persist operation to be
// 3 nanosecond more than was persisted // 1 nanosecond more than was persisted
let truncated_time = let truncated_time = flush_t + chrono::Duration::nanoseconds(1);
start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1);
assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time); assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time);
let guard = w.flush_handle().unwrap(); let guard = w.flush_handle(instant + Duration::from_secs(240)).unwrap();
w.add_range( w.add_range(
Some(&Sequence { id: 1, number: 9 }), Some(&Sequence { id: 1, number: 9 }),
@ -974,12 +1009,12 @@ mod tests {
// This should not rotate into persistable as active flush guard // This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(360)); w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable_row_count(), 5); assert_eq!(w.persistable.as_ref().unwrap().row_count, 5);
std::mem::drop(guard); std::mem::drop(guard);
// This should rotate into persistable // This should rotate into persistable
w.rotate(instant + Duration::from_secs(360)); w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable_row_count(), 5 + 9); assert_eq!(w.persistable.as_ref().unwrap().row_count, 5 + 9);
assert_eq!(w.persistable.as_ref().unwrap().min_time, start); assert_eq!(w.persistable.as_ref().unwrap().min_time, start);
} }
@ -1027,9 +1062,17 @@ mod tests {
assert_eq!(w.closed[1].row_count, 9); assert_eq!(w.closed[1].row_count, 9);
assert_eq!(w.open.as_ref().unwrap().row_count, 17); assert_eq!(w.open.as_ref().unwrap().row_count, 17);
let flush = w.flush_handle().unwrap(); let flush = w
assert_eq!(w.persistable_row_count(), 2); .flush_handle(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3)
.unwrap();
let flush_t = flush.timestamp();
assert_eq!(flush.closed_count, 2); assert_eq!(flush.closed_count, 2);
assert_eq!(flush_t, start + chrono::Duration::seconds(2));
let truncated_time = flush_t + chrono::Duration::nanoseconds(1);
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
w.add_range( w.add_range(
Some(&Sequence { id: 1, number: 14 }), Some(&Sequence { id: 1, number: 14 }),
@ -1042,21 +1085,20 @@ mod tests {
w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5); w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5);
// Despite time passing persistable window shouldn't have changed due to flush guard // Despite time passing persistable window shouldn't have changed due to flush guard
assert_eq!(w.persistable_row_count(), 2); assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!(w.closed.len(), 4); assert_eq!(w.closed.len(), 4);
w.flush(flush); w.flush(flush);
let flush_time = start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1);
assert!(w.persistable.is_none()); assert!(w.persistable.is_none());
assert_eq!(w.closed.len(), 4); assert_eq!(w.closed.len(), 4);
assert_eq!(w.closed[0].min_time, flush_time); assert_eq!(w.closed[0].min_time, truncated_time);
assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4)); assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4));
assert_eq!(w.closed[0].row_count, 5); assert_eq!(w.closed[0].row_count, 5);
assert_eq!(w.closed[1].min_time, flush_time); assert_eq!(w.closed[1].min_time, truncated_time);
assert_eq!(w.closed[1].max_time, flush_time); assert_eq!(w.closed[1].max_time, truncated_time);
assert_eq!(w.closed[1].row_count, 0); // Entirely flushed window assert_eq!(w.closed[1].row_count, 0); // Entirely flushed window
// Window closed after flush handle - should be left alone // Window closed after flush handle - should be left alone

View File

@ -2074,8 +2074,8 @@ mod tests {
assert_eq!(&table_summary.name, "cpu"); assert_eq!(&table_summary.name, "cpu");
assert_eq!(table_summary.count(), 2); assert_eq!(table_summary.count(), 2);
let windows = partition.persistence_windows().unwrap(); let windows = partition.persistence_windows().unwrap();
let open_min = windows.open_min_time().unwrap(); let open_min = windows.minimum_unpersisted_timestamp().unwrap();
let open_max = windows.open_max_time().unwrap(); let open_max = windows.maximum_unpersisted_timestamp().unwrap();
assert_eq!(open_min.timestamp_nanos(), 10); assert_eq!(open_min.timestamp_nanos(), 10);
assert_eq!(open_max.timestamp_nanos(), 20); assert_eq!(open_max.timestamp_nanos(), 20);
} }

View File

@ -112,6 +112,18 @@ impl LockableChunk for LockableCatalogChunk {
} }
} }
/// A newtype wrapper around persistence_windows::FlushHandle
///
/// Represents the context for flushing data out of the PersistenceWindows
#[derive(Debug)]
pub struct CatalogPersistHandle(FlushHandle);
impl lifecycle::PersistHandle for CatalogPersistHandle {
fn timestamp(&self) -> DateTime<Utc> {
self.0.timestamp()
}
}
/// ///
/// A `LockableCatalogPartition` combines a `Partition` with its owning `Db` /// A `LockableCatalogPartition` combines a `Partition` with its owning `Db`
/// ///
@ -155,7 +167,7 @@ impl LockablePartition for LockableCatalogPartition {
type Chunk = LockableCatalogChunk; type Chunk = LockableCatalogChunk;
type PersistHandle = FlushHandle; type PersistHandle = CatalogPersistHandle;
type Error = super::lifecycle::Error; type Error = super::lifecycle::Error;
@ -200,25 +212,21 @@ impl LockablePartition for LockableCatalogPartition {
fn prepare_persist( fn prepare_persist(
partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>,
) -> Option<(Self::PersistHandle, DateTime<Utc>)> { now: Instant,
) -> Option<Self::PersistHandle> {
let window = partition.persistence_windows_mut().unwrap(); let window = partition.persistence_windows_mut().unwrap();
window.rotate(Instant::now()); let handle = window.flush_handle(now);
trace!(?handle, "preparing for persist");
let max_persistable_timestamp = window.max_persistable_timestamp(); Some(CatalogPersistHandle(handle?))
let handle = window.flush_handle();
trace!(?max_persistable_timestamp, ?handle, "preparing for persist");
Some((handle?, max_persistable_timestamp?))
} }
fn persist_chunks( fn persist_chunks(
partition: LifecycleWriteGuard<'_, Partition, Self>, partition: LifecycleWriteGuard<'_, Partition, Self>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>>,
max_persistable_timestamp: DateTime<Utc>, handle: Self::PersistHandle,
handle: FlushHandle,
) -> Result<TaskTracker<Job>, Self::Error> { ) -> Result<TaskTracker<Job>, Self::Error> {
info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks"); info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks");
let (tracker, fut) = let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?;
persist::persist_chunks(partition, chunks, max_persistable_timestamp, handle)?;
let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") }); let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") });
Ok(tracker) Ok(tracker)
} }
@ -268,9 +276,9 @@ impl LifecyclePartition for Partition {
.unwrap_or(true) .unwrap_or(true)
} }
fn persistable_row_count(&self) -> usize { fn persistable_row_count(&self, now: Instant) -> usize {
self.persistence_windows() self.persistence_windows()
.map(|w| w.persistable_row_count()) .map(|w| w.persistable_row_count(now))
.unwrap_or(0) .unwrap_or(0)
} }

View File

@ -3,7 +3,6 @@
use std::future::Future; use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, Utc};
use data_types::job::Job; use data_types::job::Job;
use lifecycle::{LifecycleWriteGuard, LockableChunk}; use lifecycle::{LifecycleWriteGuard, LockableChunk};
use observability_deps::tracing::info; use observability_deps::tracing::info;
@ -28,7 +27,6 @@ use persistence_windows::persistence_windows::FlushHandle;
pub(super) fn persist_chunks( pub(super) fn persist_chunks(
partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>,
chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>, chunks: Vec<LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>>,
max_persistable_timestamp: DateTime<Utc>,
flush_handle: FlushHandle, flush_handle: FlushHandle,
) -> Result<( ) -> Result<(
TaskTracker<Job>, TaskTracker<Job>,
@ -42,6 +40,7 @@ pub(super) fn persist_chunks(
info!(%table_name, %partition_key, ?chunk_ids, "splitting and persisting chunks"); info!(%table_name, %partition_key, ?chunk_ids, "splitting and persisting chunks");
let max_persistable_timestamp = flush_handle.timestamp();
let flush_timestamp = max_persistable_timestamp.timestamp_nanos(); let flush_timestamp = max_persistable_timestamp.timestamp_nanos();
let (tracker, registration) = db.jobs.register(Job::PersistChunks { let (tracker, registration) = db.jobs.register(Job::PersistChunks {