diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index ac051b69dc..0ac0f86564 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -45,7 +45,7 @@ pub trait LifecycleDb { pub trait LockablePartition: Sized + std::fmt::Display { type Partition: LifecyclePartition; type Chunk: LockableChunk; - type PersistHandle: Send + Sync + 'static; + type PersistHandle: PersistHandle + Send + Sync + 'static; type Error: std::error::Error + Send + Sync; @@ -78,10 +78,12 @@ pub trait LockablePartition: Sized + std::fmt::Display { /// Returns None if there is a persistence operation in flight, or /// if there are no persistable windows. /// - /// TODO: This interface is nasty + /// `now` is the wall clock time that should be used to compute how long a given + /// write has been present in memory fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - ) -> Option<(Self::PersistHandle, DateTime)>; + now: Instant, + ) -> Option; /// Split and persist chunks. /// @@ -97,7 +99,6 @@ pub trait LockablePartition: Sized + std::fmt::Display { fn persist_chunks( partition: LifecycleWriteGuard<'_, Self::Partition, Self>, chunks: Vec::Chunk, Self::Chunk>>, - max_persistable_timestamp: DateTime, handle: Self::PersistHandle, ) -> Result::Job>, Self::Error>; @@ -162,7 +163,10 @@ pub trait LifecyclePartition { fn is_persisted(&self) -> bool; /// Returns an approximation of the number of rows that can be persisted - fn persistable_row_count(&self) -> usize; + /// + /// `now` is the wall clock time that should be used to compute how long a given + /// write has been present in memory + fn persistable_row_count(&self, now: Instant) -> usize; /// Returns the age of the oldest unpersisted write fn minimum_unpersisted_age(&self) -> Option; @@ -187,3 +191,10 @@ pub trait LifecycleChunk { fn row_count(&self) -> usize; } + +/// The trait for a persist handle +pub trait PersistHandle { + /// Any unpersisted chunks containing rows with timestamps less than or equal to this + /// must be included in the corresponding `LockablePartition::persist_chunks` call + fn timestamp(&self) -> DateTime; +} diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 33f1a90362..09cdb385c3 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -11,7 +11,10 @@ use data_types::database_rules::{LifecycleRules, DEFAULT_MUB_ROW_THRESHOLD}; use observability_deps::tracing::{debug, info, trace, warn}; use tracker::TaskTracker; -use crate::{LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition}; +use crate::{ + LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition, + PersistHandle, +}; /// Number of seconds to wait before retying a failed lifecycle action pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); @@ -321,14 +324,14 @@ where }) .unwrap_or_default() as u32; + let persistable_row_count = partition.persistable_row_count(now); debug!(%db_name, %partition, - partition_persist_row_count=partition.persistable_row_count(), + partition_persist_row_count=persistable_row_count, rules_persist_row_count=%rules.persist_row_threshold.get(), partition_persistable_age_seconds=persistable_age_seconds, rules_persist_age_threshold_seconds=%rules.persist_age_threshold_seconds.get(), "considering for persistence"); - let persistable_row_count = partition.persistable_row_count(); if persistable_row_count >= rules.persist_row_threshold.get() { info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold"); } else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() { @@ -343,9 +346,7 @@ where // Upgrade partition to be able to rotate persistence windows let mut partition = partition.upgrade(); - let (persist_handle, max_persistable_timestamp) = match LockablePartition::prepare_persist( - &mut partition, - ) { + let persist_handle = match LockablePartition::prepare_persist(&mut partition, now) { Some(x) => x, None => { debug!(%db_name, %partition, "no persistable windows or previous outstanding persist"); @@ -375,7 +376,7 @@ where // Chunk's data is entirely after the time we are flushing // up to, and thus there is reason to include it in the // plan - if chunk.min_timestamp() > max_persistable_timestamp { + if chunk.min_timestamp() > persist_handle.timestamp() { // Can safely ignore chunk debug!(%db_name, %partition, chunk=%chunk.addr(), "chunk does not contain data eligible for persistence"); @@ -404,14 +405,9 @@ where .map(|chunk| chunk.upgrade()) .collect(); - let tracker = LockablePartition::persist_chunks( - partition, - chunks, - max_persistable_timestamp, - persist_handle, - ) - .expect("failed to persist chunks") - .with_metadata(ChunkLifecycleAction::Persisting); + let tracker = LockablePartition::persist_chunks(partition, chunks, persist_handle) + .expect("failed to persist chunks") + .with_metadata(ChunkLifecycleAction::Persisting); self.trackers.push(tracker); false @@ -620,7 +616,7 @@ mod tests { use crate::{ ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, - LockablePartition, + LockablePartition, PersistHandle, }; use super::*; @@ -750,10 +746,21 @@ mod tests { chunk: Arc>, } + #[derive(Debug)] + struct TestPersistHandle { + timestamp: DateTime, + } + + impl PersistHandle for TestPersistHandle { + fn timestamp(&self) -> DateTime { + self.timestamp + } + } + impl<'a> LockablePartition for TestLockablePartition<'a> { type Partition = TestPartition; type Chunk = TestLockableChunk<'a>; - type PersistHandle = (); + type PersistHandle = TestPersistHandle; type Error = Infallible; fn read(&self) -> LifecycleReadGuard<'_, Self::Partition, Self> { @@ -818,17 +825,18 @@ mod tests { fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - ) -> Option<(Self::PersistHandle, DateTime)> { - Some(((), partition.max_persistable_timestamp.unwrap())) + _now: Instant, + ) -> Option { + Some(TestPersistHandle { + timestamp: partition.max_persistable_timestamp.unwrap(), + }) } fn persist_chunks( mut partition: LifecycleWriteGuard<'_, TestPartition, Self>, chunks: Vec>, - _max_persistable_timestamp: DateTime, - _handle: Self::PersistHandle, + handle: Self::PersistHandle, ) -> Result, Self::Error> { - let flush_timestamp = partition.max_persistable_timestamp.unwrap(); for chunk in &chunks { partition.chunks.remove(&chunk.addr.chunk_id); } @@ -838,7 +846,7 @@ mod tests { // The remainder left behind after the split let new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer) - .with_min_timestamp(flush_timestamp); + .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1)); partition .chunks @@ -910,7 +918,7 @@ mod tests { false } - fn persistable_row_count(&self) -> usize { + fn persistable_row_count(&self, _now: Instant) -> usize { self.persistable_row_count } diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 6dc736e1d5..40c8ab38c3 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -17,9 +17,20 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); /// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while /// persisting mostly in non-time overlapping Parquet files. /// -/// The sequencer_id in the code below will map to a Kafka partition. The sequence_number maps +/// The sequencer_id in the code below will map to a Kafka partition id. The sequence_number maps /// to a Kafka offset. Because IOx will run without Kafka, we use the more generic terms rather /// than the Kafka terminology. +/// +/// The `PersistenceWindows` operate on two different types of time +/// +/// * row timestamps - these are `DateTime` and are the row's value for the `time` column +/// * Wall timestamps - these are `Instant` and are the Wall clock of the system used to determine +/// the "age" of a set of writes within a PersistenceWindow +/// +/// To aid testing Wall timestamps are passed to many methods instead of directly using `Instant::now` +/// +/// The PersistenceWindows answer the question: - "What is the maximum row timestamp in the writes +/// that arrived more than late_arrival_period seconds ago, as determined by wall clock time" #[derive(Debug)] pub struct PersistenceWindows { persistable: ReadLock>, @@ -28,7 +39,7 @@ pub struct PersistenceWindows { late_arrival_period: Duration, closed_window_period: Duration, - /// The last last instant passed to PersistenceWindows::add_range + /// The last instant passed to PersistenceWindows::add_range last_instant: Instant, /// maps sequencer_id to the maximum sequence passed to PersistenceWindows::add_range @@ -36,7 +47,13 @@ pub struct PersistenceWindows { } /// A handle for flushing data from the `PersistenceWindows` -/// while preventing additional modification to the `persistable` list +/// +/// When a `FlushHandle` is created it computes the row timestamp that should be persisted up to +/// +/// It then allows flushing the corresponding writes from the `PersistenceWindows` that were +/// present at the time the `FlushHandle` was created. Even if later writes have been recorded +/// in the `PersistenceWindows` in the intervening time +/// #[derive(Debug)] pub struct FlushHandle { guard: ReadGuard>, @@ -45,6 +62,16 @@ pub struct FlushHandle { /// This identifies the windows that can have their /// minimum timestamps truncated on flush closed_count: usize, + + /// The timestamp to flush + timestamp: DateTime, +} + +impl FlushHandle { + /// Should flush all rows with a timestamp less than or equal to this + pub fn timestamp(&self) -> DateTime { + self.timestamp + } } impl PersistenceWindows { @@ -127,59 +154,37 @@ impl PersistenceWindows { }; } - /// Returns the min time of the open persistence window, if any - pub fn open_min_time(&self) -> Option> { - self.open.as_ref().map(|open| open.min_time) - } - - /// Returns the max time of the open persistence window, if any - pub fn open_max_time(&self) -> Option> { - self.open.as_ref().map(|open| open.max_time) - } - - /// Returns the number of rows that are persistable. These rows could be duplicates and there - /// are other rows that may fall in closed and open that would be pulled into a persistence - /// operation. This number is used to determine if persistence should be triggered, not as - /// an exact number. - pub fn persistable_row_count(&self) -> usize { - self.persistable.as_ref().map(|w| w.row_count).unwrap_or(0) - } - - /// Returns the instant of the oldest persistable data - pub fn persistable_age(&self) -> Option { - self.persistable.as_ref().map(|w| w.created_at) - } - - /// Returns the max timestamp of data in the persistable window. Any unpersisted data with a - /// timestamp <= than this value can be persisted. - pub fn max_persistable_timestamp(&self) -> Option> { - self.persistable.as_ref().map(|w| w.max_time) - } - /// rotates open window to closed if past time and any closed windows to persistable. - pub fn rotate(&mut self, now: Instant) { + /// + /// `now` is the Wall clock time of the server to use for determining how "old" a given + /// persistence window is, or in other words, how long since the writes it contains the + /// metrics for were written to this partition + fn rotate(&mut self, now: Instant) { let rotate = self .open .as_ref() - .map(|w| now.duration_since(w.created_at) >= self.closed_window_period) + .map(|w| w.is_closeable(now, self.closed_window_period)) .unwrap_or(false); if rotate { self.closed.push_back(self.open.take().unwrap()) } + let late_arrival_period = self.late_arrival_period; + // if there is no ongoing persistence operation, try and - // add closed windows to the `perstable` list + // add closed windows to the `persistable` window if let Some(persistable) = self.persistable.get_mut() { - while let Some(w) = self.closed.pop_front() { - if now.duration_since(w.created_at) >= self.late_arrival_period { - match persistable.as_mut() { - Some(persistable_window) => persistable_window.add_window(w), - None => *persistable = Some(w), - } - } else { - self.closed.push_front(w); - break; + while self + .closed + .front() + .map(|w| w.is_persistable(now, late_arrival_period)) + .unwrap_or(false) + { + let w = self.closed.pop_front().unwrap(); + match persistable.as_mut() { + Some(persistable_window) => persistable_window.add_window(w), + None => *persistable = Some(w), } } } @@ -188,18 +193,24 @@ impl PersistenceWindows { /// Acquire a handle that prevents mutation of the persistable window until dropped /// /// Returns `None` if there is an outstanding handle - pub fn flush_handle(&mut self) -> Option { + pub fn flush_handle(&mut self, now: Instant) -> Option { // Verify no active flush handles self.persistable.get_mut()?; + + // Rotate into persistable window + self.rotate(now); + Some(FlushHandle { guard: self.persistable.lock(), closed_count: self.closed.len(), + timestamp: self.persistable.as_ref()?.max_time, }) } /// Clears out the persistable window pub fn flush(&mut self, handle: FlushHandle) { let closed_count = handle.closed_count; + let timestamp = handle.timestamp; std::mem::drop(handle); assert!( @@ -210,40 +221,40 @@ impl PersistenceWindows { let persistable = self .persistable .get_mut() - .expect("expected no active locks"); + .expect("expected no active locks") + .take() + .expect("expected persistable window"); - if let Some(persistable) = persistable { - // Everything up to and including persistable max time will have been persisted - let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1); - for w in self.closed.iter_mut().take(closed_count) { - if w.min_time < new_min { - w.min_time = new_min; - if w.max_time < new_min { - w.max_time = new_min; - w.row_count = 0; - } + assert_eq!( + persistable.max_time, timestamp, + "persistable max time doesn't match handle" + ); + + // Everything up to and including persistable max time will have been persisted + let new_min = Utc.timestamp_nanos(persistable.max_time.timestamp_nanos() + 1); + for w in self.closed.iter_mut().take(closed_count) { + if w.min_time < new_min { + w.min_time = new_min; + if w.max_time < new_min { + w.max_time = new_min; + w.row_count = 0; } } } + } - *persistable = None; + /// Returns an iterator over the windows starting with the oldest + fn windows(&self) -> impl Iterator { + self.persistable + .as_ref() + .into_iter() + .chain(self.closed.iter()) + .chain(self.open.as_ref().into_iter()) } /// Returns the minimum window fn minimum_window(&self) -> Option<&Window> { - if let Some(w) = self.persistable.as_ref() { - return Some(w); - } - - if let Some(w) = self.closed.front() { - return Some(w); - } - - if let Some(w) = self.open.as_ref() { - return Some(w); - } - - None + self.windows().next() } /// Returns the unpersisted sequencer numbers that represent the min @@ -255,6 +266,24 @@ impl PersistenceWindows { pub fn minimum_unpersisted_age(&self) -> Option { self.minimum_window().map(|x| x.created_at) } + + /// Returns the minimum unpersisted timestamp + pub fn minimum_unpersisted_timestamp(&self) -> Option> { + self.windows().map(|x| x.min_time).min() + } + + /// Returns the maximum unpersisted timestamp + pub fn maximum_unpersisted_timestamp(&self) -> Option> { + self.windows().map(|x| x.max_time).max() + } + + /// Returns the number of persistable rows + pub fn persistable_row_count(&self, now: Instant) -> usize { + self.windows() + .take_while(|window| window.is_persistable(now, self.late_arrival_period)) + .map(|window| window.row_count) + .sum() + } } #[derive(Debug, Clone)] @@ -347,6 +376,20 @@ impl Window { } } } + + /// If this window can be closed + fn is_closeable(&self, now: Instant, closed_window_period: Duration) -> bool { + now.checked_duration_since(self.created_at) + .map(|x| x >= closed_window_period) + .unwrap_or(false) + } + + /// If this window is persistable + fn is_persistable(&self, now: Instant, late_arrival_period: Duration) -> bool { + now.checked_duration_since(self.created_at) + .map(|x| x >= late_arrival_period) + .unwrap_or(false) + } } #[cfg(test)] @@ -390,7 +433,6 @@ mod tests { Instant::now(), ); - assert_eq!(w.persistable_row_count(), 0); assert!(w.closed.is_empty()); assert!(w.persistable.is_none()); let open = w.open.unwrap(); @@ -441,7 +483,7 @@ mod tests { after_close_threshold, ); - assert_eq!(w.persistable_row_count(), 0); + assert!(w.persistable.is_none()); let closed = w.closed.get(0).unwrap(); assert_eq!( @@ -501,7 +543,7 @@ mod tests { third_created_at, ); - assert_eq!(w.persistable_row_count(), 0); + assert!(w.persistable.is_none()); // confirm the two on closed and third on open let c = w.closed.get(0).cloned().unwrap(); assert_eq!(c.created_at, created_at); @@ -533,9 +575,6 @@ mod tests { fourth_created_at, ); - assert_eq!(w.persistable_row_count(), 5); - assert_eq!(w.persistable_age(), Some(created_at)); - // confirm persistable has first and second let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); @@ -561,9 +600,6 @@ mod tests { fifth_created_at, ); - assert_eq!(w.persistable_row_count(), 10); - assert_eq!(w.persistable_age(), Some(created_at)); - let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); assert_eq!(c.row_count, 10); @@ -621,8 +657,6 @@ mod tests { w.rotate(end_at); - let max_time = w.max_persistable_timestamp().unwrap(); - assert_eq!(max_time, first_end); let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); assert_eq!(c.row_count, 2); @@ -631,8 +665,10 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let handle = w.flush_handle().unwrap(); + + let handle = w.flush_handle(end_at).unwrap(); w.flush(handle); + assert!(w.persistable.is_none()); let mins = w.closed[0].sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); @@ -700,11 +736,6 @@ mod tests { w.rotate(end_at); - let max_time = w.max_persistable_timestamp().unwrap(); - assert_eq!(max_time, first_end); - - let flushed_time = max_time + chrono::Duration::nanoseconds(1); - let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); assert_eq!(c.row_count, 2); @@ -713,16 +744,20 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let flush = w.flush_handle().unwrap(); + let flush = w.flush_handle(end_at).unwrap(); + + assert_eq!(flush.timestamp(), first_end); + let truncated_time = flush.timestamp() + chrono::Duration::nanoseconds(1); + w.flush(flush); assert!(w.persistable.is_none()); let mins = w.closed[0].sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - // the first closed window should have a min time equal to the flush + // the first closed window should have a min time truncated by the flush let c = &w.closed[0]; assert_eq!(c.row_count, 3); - assert_eq!(c.min_time, flushed_time); + assert_eq!(c.min_time, truncated_time); assert_eq!(c.max_time, second_end); assert_eq!(c.created_at, second_created_at); @@ -780,11 +815,6 @@ mod tests { w.rotate(end_at); - let max_time = w.max_persistable_timestamp().unwrap(); - assert_eq!(max_time, first_end); - - let flushed_time = max_time + chrono::Duration::nanoseconds(1); - let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); assert_eq!(c.row_count, 2); @@ -793,7 +823,11 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let flush = w.flush_handle().unwrap(); + + let flush = w.flush_handle(end_at).unwrap(); + assert_eq!(flush.timestamp(), first_end); + let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1); + w.flush(flush); assert!(w.persistable.is_none()); let mins = w.closed[0].sequencer_numbers.clone(); @@ -860,14 +894,6 @@ mod tests { third_created_at, ); - // this should rotate the first window into persistable - w.rotate(end_at); - - let max_time = w.max_persistable_timestamp().unwrap(); - assert_eq!(max_time, first_end); - - let flushed_time = max_time + chrono::Duration::nanoseconds(1); - let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); assert_eq!(c.row_count, 2); @@ -877,9 +903,14 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); + // this should rotate the first window into persistable // after flush we should see no more persistable window and the closed windows // should have min timestamps equal to the previous flush end. - let flush = w.flush_handle().unwrap(); + let flush = w.flush_handle(end_at).unwrap(); + + assert_eq!(flush.timestamp(), first_end); + let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1); + w.flush(flush); assert!(w.persistable.is_none()); let mins = w.closed[0].sequencer_numbers.clone(); @@ -917,9 +948,9 @@ mod tests { w.rotate(instant + Duration::from_secs(120)); assert!(w.persistable.is_some()); - assert_eq!(w.persistable_row_count(), 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); assert_eq!( - w.max_persistable_timestamp().unwrap(), + w.persistable.as_ref().unwrap().max_time, start + chrono::Duration::seconds(2) ); @@ -935,30 +966,34 @@ mod tests { w.rotate(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD); assert_eq!(w.closed.len(), 1); - let guard = w.flush_handle().unwrap(); + let guard = w + .flush_handle(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); + // Should only allow one at once - assert!(w.flush_handle().is_none()); + assert!(w.flush_handle(instant).is_none()); // This should not rotate into persistable as active flush guard w.rotate(instant + Duration::from_secs(240)); - assert_eq!(w.persistable_row_count(), 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); + + let flush_t = guard.timestamp(); + assert_eq!(flush_t, start + chrono::Duration::seconds(2)); // Flush persistable window w.flush(guard); - assert_eq!(w.persistable_row_count(), 0); + assert!(w.persistable.is_none()); // This should rotate into persistable w.rotate(instant + Duration::from_secs(240)); - assert_eq!(w.persistable_row_count(), 5); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 5); // Min time should have been truncated by persist operation to be - // 3 nanosecond more than was persisted - let truncated_time = - start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1); - + // 1 nanosecond more than was persisted + let truncated_time = flush_t + chrono::Duration::nanoseconds(1); assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time); - let guard = w.flush_handle().unwrap(); + let guard = w.flush_handle(instant + Duration::from_secs(240)).unwrap(); w.add_range( Some(&Sequence { id: 1, number: 9 }), @@ -974,12 +1009,12 @@ mod tests { // This should not rotate into persistable as active flush guard w.rotate(instant + Duration::from_secs(360)); - assert_eq!(w.persistable_row_count(), 5); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 5); std::mem::drop(guard); // This should rotate into persistable w.rotate(instant + Duration::from_secs(360)); - assert_eq!(w.persistable_row_count(), 5 + 9); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 5 + 9); assert_eq!(w.persistable.as_ref().unwrap().min_time, start); } @@ -1027,9 +1062,17 @@ mod tests { assert_eq!(w.closed[1].row_count, 9); assert_eq!(w.open.as_ref().unwrap().row_count, 17); - let flush = w.flush_handle().unwrap(); - assert_eq!(w.persistable_row_count(), 2); + let flush = w + .flush_handle(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3) + .unwrap(); + + let flush_t = flush.timestamp(); + assert_eq!(flush.closed_count, 2); + assert_eq!(flush_t, start + chrono::Duration::seconds(2)); + let truncated_time = flush_t + chrono::Duration::nanoseconds(1); + + assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); w.add_range( Some(&Sequence { id: 1, number: 14 }), @@ -1042,21 +1085,20 @@ mod tests { w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5); // Despite time passing persistable window shouldn't have changed due to flush guard - assert_eq!(w.persistable_row_count(), 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); assert_eq!(w.closed.len(), 4); w.flush(flush); - let flush_time = start + chrono::Duration::seconds(2) + chrono::Duration::nanoseconds(1); assert!(w.persistable.is_none()); assert_eq!(w.closed.len(), 4); - assert_eq!(w.closed[0].min_time, flush_time); + assert_eq!(w.closed[0].min_time, truncated_time); assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4)); assert_eq!(w.closed[0].row_count, 5); - assert_eq!(w.closed[1].min_time, flush_time); - assert_eq!(w.closed[1].max_time, flush_time); + assert_eq!(w.closed[1].min_time, truncated_time); + assert_eq!(w.closed[1].max_time, truncated_time); assert_eq!(w.closed[1].row_count, 0); // Entirely flushed window // Window closed after flush handle - should be left alone diff --git a/server/src/db.rs b/server/src/db.rs index 8d114be11c..4aea194e72 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2074,8 +2074,8 @@ mod tests { assert_eq!(&table_summary.name, "cpu"); assert_eq!(table_summary.count(), 2); let windows = partition.persistence_windows().unwrap(); - let open_min = windows.open_min_time().unwrap(); - let open_max = windows.open_max_time().unwrap(); + let open_min = windows.minimum_unpersisted_timestamp().unwrap(); + let open_max = windows.maximum_unpersisted_timestamp().unwrap(); assert_eq!(open_min.timestamp_nanos(), 10); assert_eq!(open_max.timestamp_nanos(), 20); } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 1a49b8f10f..12f1e6094c 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -112,6 +112,18 @@ impl LockableChunk for LockableCatalogChunk { } } +/// A newtype wrapper around persistence_windows::FlushHandle +/// +/// Represents the context for flushing data out of the PersistenceWindows +#[derive(Debug)] +pub struct CatalogPersistHandle(FlushHandle); + +impl lifecycle::PersistHandle for CatalogPersistHandle { + fn timestamp(&self) -> DateTime { + self.0.timestamp() + } +} + /// /// A `LockableCatalogPartition` combines a `Partition` with its owning `Db` /// @@ -155,7 +167,7 @@ impl LockablePartition for LockableCatalogPartition { type Chunk = LockableCatalogChunk; - type PersistHandle = FlushHandle; + type PersistHandle = CatalogPersistHandle; type Error = super::lifecycle::Error; @@ -200,25 +212,21 @@ impl LockablePartition for LockableCatalogPartition { fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - ) -> Option<(Self::PersistHandle, DateTime)> { + now: Instant, + ) -> Option { let window = partition.persistence_windows_mut().unwrap(); - window.rotate(Instant::now()); - - let max_persistable_timestamp = window.max_persistable_timestamp(); - let handle = window.flush_handle(); - trace!(?max_persistable_timestamp, ?handle, "preparing for persist"); - Some((handle?, max_persistable_timestamp?)) + let handle = window.flush_handle(now); + trace!(?handle, "preparing for persist"); + Some(CatalogPersistHandle(handle?)) } fn persist_chunks( partition: LifecycleWriteGuard<'_, Partition, Self>, chunks: Vec>, - max_persistable_timestamp: DateTime, - handle: FlushHandle, + handle: Self::PersistHandle, ) -> Result, Self::Error> { info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks"); - let (tracker, fut) = - persist::persist_chunks(partition, chunks, max_persistable_timestamp, handle)?; + let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?; let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") }); Ok(tracker) } @@ -268,9 +276,9 @@ impl LifecyclePartition for Partition { .unwrap_or(true) } - fn persistable_row_count(&self) -> usize { + fn persistable_row_count(&self, now: Instant) -> usize { self.persistence_windows() - .map(|w| w.persistable_row_count()) + .map(|w| w.persistable_row_count(now)) .unwrap_or(0) } diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index bae47c00a7..d115a1f70b 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -3,7 +3,6 @@ use std::future::Future; use std::sync::Arc; -use chrono::{DateTime, Utc}; use data_types::job::Job; use lifecycle::{LifecycleWriteGuard, LockableChunk}; use observability_deps::tracing::info; @@ -28,7 +27,6 @@ use persistence_windows::persistence_windows::FlushHandle; pub(super) fn persist_chunks( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, chunks: Vec>, - max_persistable_timestamp: DateTime, flush_handle: FlushHandle, ) -> Result<( TaskTracker, @@ -42,6 +40,7 @@ pub(super) fn persist_chunks( info!(%table_name, %partition_key, ?chunk_ids, "splitting and persisting chunks"); + let max_persistable_timestamp = flush_handle.timestamp(); let flush_timestamp = max_persistable_timestamp.timestamp_nanos(); let (tracker, registration) = db.jobs.register(Job::PersistChunks {