From 8e5d5928cf700201fe9b0f17b506ebc6dc1b47ac Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 20 Jul 2021 09:46:52 +0100 Subject: [PATCH] feat: compute WriteSummary from PersistenceWindows (#2030) (#2054) * feat: compute WriteSummary from PersistenceWindows (#2030) * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- data_types/src/lib.rs | 1 + data_types/src/write_summary.rs | 20 +++ .../src/persistence_windows.rs | 148 +++++++++++++++++- 3 files changed, 164 insertions(+), 5 deletions(-) create mode 100644 data_types/src/write_summary.rs diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index bea9629bc3..76d7ca0306 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -22,3 +22,4 @@ pub mod names; pub mod partition_metadata; pub mod server_id; pub mod timestamp; +pub mod write_summary; diff --git a/data_types/src/write_summary.rs b/data_types/src/write_summary.rs new file mode 100644 index 0000000000..9574910262 --- /dev/null +++ b/data_types/src/write_summary.rs @@ -0,0 +1,20 @@ +use chrono::{DateTime, Utc}; + +/// A description of a set of writes +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct WriteSummary { + /// The wall clock timestamp of the last write in this summary + pub time_of_first_write: DateTime, + + /// The wall clock timestamp of the last write in this summary + pub time_of_last_write: DateTime, + + /// The minimum row timestamp for data in this summary + pub min_timestamp: DateTime, + + /// The maximum row timestamp value for data in this summary + pub max_timestamp: DateTime, + + /// The number of rows in this summary + pub row_count: usize, +} diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 8dd287f5ba..1b5ae73d29 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -7,7 +7,7 @@ use std::{ use chrono::{DateTime, TimeZone, Utc}; -use data_types::partition_metadata::PartitionAddr; +use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}; use entry::Sequence; use internal_types::guard::{ReadGuard, ReadLock}; @@ -45,6 +45,16 @@ pub struct PersistenceWindows { late_arrival_period: Duration, closed_window_period: Duration, + /// The datetime this PersistenceWindows was created + /// + /// `PersistenceWindows` internally uses monotonic `Instant`, however, + /// these cannot be rendered. To provide a stable rendering of Wall timestamp, + /// a single timestamp is recorded at creation time + created_at_time: DateTime, + + /// The instant this PersistenceWindows was created + created_at_instant: Instant, + /// The last instant passed to PersistenceWindows::add_range last_instant: Instant, @@ -106,6 +116,9 @@ impl PersistenceWindows { let closed_window_count = late_arrival_seconds / closed_window_seconds; + let created_at_time = Utc::now(); + let created_at_instant = Instant::now(); + Self { persistable: ReadLock::new(None), closed: VecDeque::with_capacity(closed_window_count as usize), @@ -113,7 +126,9 @@ impl PersistenceWindows { addr, late_arrival_period, closed_window_period, - last_instant: Instant::now(), + created_at_time, + created_at_instant, + last_instant: created_at_instant, max_sequence_numbers: Default::default(), } } @@ -165,7 +180,7 @@ impl PersistenceWindows { self.rotate(received_at); match self.open.as_mut() { - Some(w) => w.add_range(sequence, row_count, min_time, max_time), + Some(w) => w.add_range(sequence, row_count, min_time, max_time, received_at), None => { self.open = Some(Window::new( received_at, @@ -335,6 +350,34 @@ impl PersistenceWindows { self.windows().next() } + /// Returns approximate summaries of the unpersisted writes contained + /// recorded by this PersistenceWindow instance + /// + /// These are approximate because persistence may partially flush a window, which will + /// update the min row timestamp but not the row count + pub fn summaries(&self) -> impl Iterator + '_ { + self.windows().map(move |window| { + let window_age = chrono::Duration::from_std( + window.created_at.duration_since(self.created_at_instant), + ) + .expect("duration overflow"); + + let time_of_first_write = self.created_at_time + window_age; + + let window_duration = + chrono::Duration::from_std(window.last_instant.duration_since(window.created_at)) + .expect("duration overflow"); + + WriteSummary { + time_of_first_write, + time_of_last_write: time_of_first_write + window_duration, + min_timestamp: window.min_time, + max_timestamp: window.max_time, + row_count: window.row_count, + } + }) + } + /// Returns true if this PersistenceWindows instance is empty pub fn is_empty(&self) -> bool { self.minimum_window().is_none() @@ -374,9 +417,14 @@ struct Window { /// The server time when this window was created. Used to determine how long data in this /// window has been sitting in memory. created_at: Instant, + /// The server time of the last write to this window + last_instant: Instant, + /// The number of rows in the window row_count: usize, - min_time: DateTime, // min time value for data in the window - max_time: DateTime, // max time value for data in the window + /// min time value for data in the window + min_time: DateTime, + /// max time value for data in the window + max_time: DateTime, /// maps sequencer_id to the minimum and maximum sequence numbers seen sequencer_numbers: BTreeMap, } @@ -399,6 +447,7 @@ impl Window { Self { created_at, + last_instant: created_at, row_count, min_time, max_time, @@ -414,7 +463,11 @@ impl Window { row_count: usize, min_time: DateTime, max_time: DateTime, + instant: Instant, ) { + assert!(self.created_at <= instant); + self.last_instant = instant; + self.row_count += row_count; if self.min_time > min_time { self.min_time = min_time; @@ -1265,4 +1318,89 @@ mod tests { assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2)); assert_eq!(w.closed[1].row_count, 11); } + + #[test] + fn test_summaries() { + let mut w = make_windows(Duration::from_secs(100)); + let instant = w.created_at_instant; + + // Window 1 + w.add_range( + Some(&Sequence { id: 1, number: 1 }), + 11, + Utc.timestamp_nanos(10), + Utc.timestamp_nanos(11), + instant + Duration::from_millis(1), + ); + + w.add_range( + Some(&Sequence { id: 1, number: 2 }), + 4, + Utc.timestamp_nanos(10), + Utc.timestamp_nanos(340), + instant + Duration::from_millis(30), + ); + + w.add_range( + Some(&Sequence { id: 1, number: 3 }), + 6, + Utc.timestamp_nanos(1), + Utc.timestamp_nanos(5), + instant + Duration::from_millis(50), + ); + + // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2 + w.add_range( + Some(&Sequence { id: 1, number: 4 }), + 3, + Utc.timestamp_nanos(89), + Utc.timestamp_nanos(90), + instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1), + ); + + // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3 + w.add_range( + Some(&Sequence { id: 1, number: 5 }), + 8, + Utc.timestamp_nanos(3), + Utc.timestamp_nanos(4), + instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3, + ); + + let closed_duration = chrono::Duration::from_std(DEFAULT_CLOSED_WINDOW_PERIOD).unwrap(); + + let summaries: Vec<_> = w.summaries().collect(); + + assert_eq!(summaries.len(), 3); + assert_eq!( + summaries, + vec![ + WriteSummary { + time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1), + time_of_last_write: w.created_at_time + chrono::Duration::milliseconds(50), + min_timestamp: Utc.timestamp_nanos(1), + max_timestamp: Utc.timestamp_nanos(340), + row_count: 21 + }, + WriteSummary { + time_of_first_write: w.created_at_time + + closed_duration + + chrono::Duration::milliseconds(1), + time_of_last_write: w.created_at_time + + closed_duration + + chrono::Duration::milliseconds(1), + min_timestamp: Utc.timestamp_nanos(89), + max_timestamp: Utc.timestamp_nanos(90), + row_count: 3 + }, + WriteSummary { + time_of_first_write: w.created_at_time + closed_duration * 3, + time_of_last_write: w.created_at_time + closed_duration * 3, + min_timestamp: Utc.timestamp_nanos(3), + max_timestamp: Utc.timestamp_nanos(4), + row_count: 8 + }, + ] + ) + } }