feat: compute WriteSummary from PersistenceWindows (#2030) (#2054)

* feat: compute WriteSummary from PersistenceWindows (#2030)

* chore: review feedback

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-07-20 09:46:52 +01:00 committed by GitHub
parent 07c8dea6b9
commit 8e5d5928cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 5 deletions

View File

@ -22,3 +22,4 @@ pub mod names;
pub mod partition_metadata;
pub mod server_id;
pub mod timestamp;
pub mod write_summary;

View File

@ -0,0 +1,20 @@
use chrono::{DateTime, Utc};
/// A description of a set of writes
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct WriteSummary {
/// The wall clock timestamp of the last write in this summary
pub time_of_first_write: DateTime<Utc>,
/// The wall clock timestamp of the last write in this summary
pub time_of_last_write: DateTime<Utc>,
/// The minimum row timestamp for data in this summary
pub min_timestamp: DateTime<Utc>,
/// The maximum row timestamp value for data in this summary
pub max_timestamp: DateTime<Utc>,
/// The number of rows in this summary
pub row_count: usize,
}

View File

@ -7,7 +7,7 @@ use std::{
use chrono::{DateTime, TimeZone, Utc};
use data_types::partition_metadata::PartitionAddr;
use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary};
use entry::Sequence;
use internal_types::guard::{ReadGuard, ReadLock};
@ -45,6 +45,16 @@ pub struct PersistenceWindows {
late_arrival_period: Duration,
closed_window_period: Duration,
/// The datetime this PersistenceWindows was created
///
/// `PersistenceWindows` internally uses monotonic `Instant`, however,
/// these cannot be rendered. To provide a stable rendering of Wall timestamp,
/// a single timestamp is recorded at creation time
created_at_time: DateTime<Utc>,
/// The instant this PersistenceWindows was created
created_at_instant: Instant,
/// The last instant passed to PersistenceWindows::add_range
last_instant: Instant,
@ -106,6 +116,9 @@ impl PersistenceWindows {
let closed_window_count = late_arrival_seconds / closed_window_seconds;
let created_at_time = Utc::now();
let created_at_instant = Instant::now();
Self {
persistable: ReadLock::new(None),
closed: VecDeque::with_capacity(closed_window_count as usize),
@ -113,7 +126,9 @@ impl PersistenceWindows {
addr,
late_arrival_period,
closed_window_period,
last_instant: Instant::now(),
created_at_time,
created_at_instant,
last_instant: created_at_instant,
max_sequence_numbers: Default::default(),
}
}
@ -165,7 +180,7 @@ impl PersistenceWindows {
self.rotate(received_at);
match self.open.as_mut() {
Some(w) => w.add_range(sequence, row_count, min_time, max_time),
Some(w) => w.add_range(sequence, row_count, min_time, max_time, received_at),
None => {
self.open = Some(Window::new(
received_at,
@ -335,6 +350,34 @@ impl PersistenceWindows {
self.windows().next()
}
/// Returns approximate summaries of the unpersisted writes contained
/// recorded by this PersistenceWindow instance
///
/// These are approximate because persistence may partially flush a window, which will
/// update the min row timestamp but not the row count
pub fn summaries(&self) -> impl Iterator<Item = WriteSummary> + '_ {
self.windows().map(move |window| {
let window_age = chrono::Duration::from_std(
window.created_at.duration_since(self.created_at_instant),
)
.expect("duration overflow");
let time_of_first_write = self.created_at_time + window_age;
let window_duration =
chrono::Duration::from_std(window.last_instant.duration_since(window.created_at))
.expect("duration overflow");
WriteSummary {
time_of_first_write,
time_of_last_write: time_of_first_write + window_duration,
min_timestamp: window.min_time,
max_timestamp: window.max_time,
row_count: window.row_count,
}
})
}
/// Returns true if this PersistenceWindows instance is empty
pub fn is_empty(&self) -> bool {
self.minimum_window().is_none()
@ -374,9 +417,14 @@ struct Window {
/// The server time when this window was created. Used to determine how long data in this
/// window has been sitting in memory.
created_at: Instant,
/// The server time of the last write to this window
last_instant: Instant,
/// The number of rows in the window
row_count: usize,
min_time: DateTime<Utc>, // min time value for data in the window
max_time: DateTime<Utc>, // max time value for data in the window
/// min time value for data in the window
min_time: DateTime<Utc>,
/// max time value for data in the window
max_time: DateTime<Utc>,
/// maps sequencer_id to the minimum and maximum sequence numbers seen
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
}
@ -399,6 +447,7 @@ impl Window {
Self {
created_at,
last_instant: created_at,
row_count,
min_time,
max_time,
@ -414,7 +463,11 @@ impl Window {
row_count: usize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
instant: Instant,
) {
assert!(self.created_at <= instant);
self.last_instant = instant;
self.row_count += row_count;
if self.min_time > min_time {
self.min_time = min_time;
@ -1265,4 +1318,89 @@ mod tests {
assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2));
assert_eq!(w.closed[1].row_count, 11);
}
#[test]
fn test_summaries() {
let mut w = make_windows(Duration::from_secs(100));
let instant = w.created_at_instant;
// Window 1
w.add_range(
Some(&Sequence { id: 1, number: 1 }),
11,
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(11),
instant + Duration::from_millis(1),
);
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
4,
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(340),
instant + Duration::from_millis(30),
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
6,
Utc.timestamp_nanos(1),
Utc.timestamp_nanos(5),
instant + Duration::from_millis(50),
);
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
3,
Utc.timestamp_nanos(89),
Utc.timestamp_nanos(90),
instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1),
);
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
8,
Utc.timestamp_nanos(3),
Utc.timestamp_nanos(4),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3,
);
let closed_duration = chrono::Duration::from_std(DEFAULT_CLOSED_WINDOW_PERIOD).unwrap();
let summaries: Vec<_> = w.summaries().collect();
assert_eq!(summaries.len(), 3);
assert_eq!(
summaries,
vec![
WriteSummary {
time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time + chrono::Duration::milliseconds(50),
min_timestamp: Utc.timestamp_nanos(1),
max_timestamp: Utc.timestamp_nanos(340),
row_count: 21
},
WriteSummary {
time_of_first_write: w.created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
time_of_last_write: w.created_at_time
+ closed_duration
+ chrono::Duration::milliseconds(1),
min_timestamp: Utc.timestamp_nanos(89),
max_timestamp: Utc.timestamp_nanos(90),
row_count: 3
},
WriteSummary {
time_of_first_write: w.created_at_time + closed_duration * 3,
time_of_last_write: w.created_at_time + closed_duration * 3,
min_timestamp: Utc.timestamp_nanos(3),
max_timestamp: Utc.timestamp_nanos(4),
row_count: 8
},
]
)
}
}