From de236c5a6fc7edf5acb3169a9483655766f30451 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 24 Jun 2021 17:50:20 -0400 Subject: [PATCH] feat: update persistence windows to support late arrival less than 30 seconds --- mutable_buffer/src/persistence_windows.rs | 110 ++++++++++++---------- server/src/db.rs | 8 +- 2 files changed, 63 insertions(+), 55 deletions(-) diff --git a/mutable_buffer/src/persistence_windows.rs b/mutable_buffer/src/persistence_windows.rs index 20cd39231b..e917bc8d55 100644 --- a/mutable_buffer/src/persistence_windows.rs +++ b/mutable_buffer/src/persistence_windows.rs @@ -7,21 +7,8 @@ use std::{ }; use chrono::{DateTime, Utc}; -use snafu::Snafu; -#[derive(Debug, Copy, Clone, Snafu)] -pub enum Error { - #[snafu(display( - "Late arrival window {:#?} too short. Minimum value should be >= {:#?}", - value, - CLOSED_WINDOW_PERIOD - ))] - ArrivalWindowTooShort { value: Duration }, -} - -pub type Result = std::result::Result; - -const CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); +const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); /// PersistenceWindows keep track of ingested data within a partition to determine when it /// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while @@ -36,28 +23,29 @@ pub struct PersistenceWindows { closed: VecDeque, open: Option, late_arrival_period: Duration, + closed_window_period: Duration, } impl PersistenceWindows { - pub fn new(late_arrival_period: Duration) -> Result { - let late_arrival_seconds = late_arrival_period.as_secs(); - let closed_window_seconds = CLOSED_WINDOW_PERIOD.as_secs(); + pub fn new(late_arrival_period: Duration) -> Self { + let closed_window_period = if late_arrival_period > DEFAULT_CLOSED_WINDOW_PERIOD { + DEFAULT_CLOSED_WINDOW_PERIOD + } else { + late_arrival_period + }; - if late_arrival_seconds < closed_window_seconds { - return ArrivalWindowTooShort { - value: late_arrival_period, - } - .fail(); - } + let late_arrival_seconds = late_arrival_period.as_secs(); + let closed_window_seconds = closed_window_period.as_secs(); let closed_window_count = late_arrival_seconds / closed_window_seconds; - Ok(Self { + Self { persistable: None, closed: VecDeque::with_capacity(closed_window_count as usize), open: None, late_arrival_period, - }) + closed_window_period, + } } /// Updates the windows with the information from a batch of rows from a single sequencer @@ -125,7 +113,7 @@ impl PersistenceWindows { let rotate = self .open .as_ref() - .map(|w| now.duration_since(w.created_at) >= CLOSED_WINDOW_PERIOD) + .map(|w| now.duration_since(w.created_at) >= self.closed_window_period) .unwrap_or(false); if rotate { @@ -291,7 +279,7 @@ mod tests { #[test] fn starts_open_window() { - let mut w = PersistenceWindows::new(Duration::from_secs(60)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(60)); let i = Instant::now(); let start_time = Utc::now(); @@ -340,7 +328,7 @@ mod tests { #[test] fn closes_open_window() { - let mut w = PersistenceWindows::new(Duration::from_secs(60)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(60)); let created_at = Instant::now(); let start_time = Utc::now(); let last_time = Utc::now(); @@ -359,7 +347,9 @@ mod tests { last_time, Instant::now(), ); - let after_close_threshold = created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let after_close_threshold = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); let open_time = Utc::now(); w.add_range( &Sequence { id: 1, number: 6 }, @@ -392,7 +382,7 @@ mod tests { #[test] fn moves_to_persistable() { - let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(120)); let created_at = Instant::now(); let start_time = Utc::now(); @@ -405,7 +395,9 @@ mod tests { created_at, ); - let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let second_created_at = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); let second_end = Utc::now(); w.add_range( &Sequence { id: 1, number: 3 }, @@ -415,7 +407,9 @@ mod tests { second_created_at, ); - let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let third_created_at = second_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); let third_end = Utc::now(); w.add_range( &Sequence { id: 1, number: 4 }, @@ -446,7 +440,7 @@ mod tests { assert_eq!(c.max_time, third_end); let fourth_created_at = third_created_at - .checked_add(CLOSED_WINDOW_PERIOD * 3) + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) .unwrap(); let fourth_end = Utc::now(); w.add_range( @@ -475,7 +469,7 @@ mod tests { assert_eq!(c.max_time, third_end); let fifth_created_at = fourth_created_at - .checked_add(CLOSED_WINDOW_PERIOD * 100) + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100) .unwrap(); w.add_range( &Sequence { id: 1, number: 9 }, @@ -497,14 +491,20 @@ mod tests { #[test] fn flush_persistable_keeps_open_and_closed() { - let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(120)); // these instants represent when the server received the data. Here we have a window that // should be in the persistable group, a closed window, and an open window that is closed on flush. let created_at = Instant::now(); - let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 2).unwrap(); - let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); - let end_at = third_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let second_created_at = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2) + .unwrap(); + let third_created_at = second_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); + let end_at = third_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); // these times represent the value of the time column for the rows of data. Here we have // non-overlapping windows. @@ -569,14 +569,20 @@ mod tests { #[test] fn flush_persistable_overlaps_closed() { - let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. let created_at = Instant::now(); - let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 2).unwrap(); - let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); - let end_at = third_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let second_created_at = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2) + .unwrap(); + let third_created_at = second_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); + let end_at = third_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); // the times of the rows of data. this will create overlapping windows where persistable // overlaps with the oldest closed window. @@ -642,13 +648,17 @@ mod tests { #[test] fn flush_persistable_overlaps_open() { - let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. let created_at = Instant::now(); - let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 3).unwrap(); - let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let second_created_at = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) + .unwrap(); + let third_created_at = second_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap(); // the times of the rows of data. this will create overlapping windows where persistable @@ -715,13 +725,17 @@ mod tests { #[test] fn flush_persistable_overlaps_open_and_closed() { - let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap(); + let mut w = PersistenceWindows::new(Duration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. let created_at = Instant::now(); - let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 3).unwrap(); - let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap(); + let second_created_at = created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) + .unwrap(); + let third_created_at = second_created_at + .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) + .unwrap(); let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap(); // the times of the rows of data. this will create overlapping windows where persistable diff --git a/server/src/db.rs b/server/src/db.rs index 466a1b97f1..b14b8a4687 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -171,11 +171,6 @@ pub enum Error { #[snafu(display("error finding min/max time on table batch: {}", source))] TableBatchTimeError { source: entry::Error }, - - #[snafu(display("error creating persistence windows: {}", source))] - PersistenceWindowsError { - source: mutable_buffer::persistence_windows::Error, - }, } pub type Result = std::result::Result; @@ -1032,8 +1027,7 @@ impl Db { ); } None => { - let mut windows = PersistenceWindows::new(late_arrival_window) - .context(PersistenceWindowsError)?; + let mut windows = PersistenceWindows::new(late_arrival_window); windows.add_range( sequenced_entry.as_ref().sequence(), row_count,