feat: update persistence windows to support late arrival less than 30 seconds
parent
435b4b6a94
commit
de236c5a6f
|
@ -7,21 +7,8 @@ use std::{
|
|||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use snafu::Snafu;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Late arrival window {:#?} too short. Minimum value should be >= {:#?}",
|
||||
value,
|
||||
CLOSED_WINDOW_PERIOD
|
||||
))]
|
||||
ArrivalWindowTooShort { value: Duration },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
const CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
|
||||
const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
|
||||
|
||||
/// PersistenceWindows keep track of ingested data within a partition to determine when it
|
||||
/// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while
|
||||
|
@ -36,28 +23,29 @@ pub struct PersistenceWindows {
|
|||
closed: VecDeque<Window>,
|
||||
open: Option<Window>,
|
||||
late_arrival_period: Duration,
|
||||
closed_window_period: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceWindows {
|
||||
pub fn new(late_arrival_period: Duration) -> Result<Self> {
|
||||
let late_arrival_seconds = late_arrival_period.as_secs();
|
||||
let closed_window_seconds = CLOSED_WINDOW_PERIOD.as_secs();
|
||||
pub fn new(late_arrival_period: Duration) -> Self {
|
||||
let closed_window_period = if late_arrival_period > DEFAULT_CLOSED_WINDOW_PERIOD {
|
||||
DEFAULT_CLOSED_WINDOW_PERIOD
|
||||
} else {
|
||||
late_arrival_period
|
||||
};
|
||||
|
||||
if late_arrival_seconds < closed_window_seconds {
|
||||
return ArrivalWindowTooShort {
|
||||
value: late_arrival_period,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let late_arrival_seconds = late_arrival_period.as_secs();
|
||||
let closed_window_seconds = closed_window_period.as_secs();
|
||||
|
||||
let closed_window_count = late_arrival_seconds / closed_window_seconds;
|
||||
|
||||
Ok(Self {
|
||||
Self {
|
||||
persistable: None,
|
||||
closed: VecDeque::with_capacity(closed_window_count as usize),
|
||||
open: None,
|
||||
late_arrival_period,
|
||||
})
|
||||
closed_window_period,
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the windows with the information from a batch of rows from a single sequencer
|
||||
|
@ -125,7 +113,7 @@ impl PersistenceWindows {
|
|||
let rotate = self
|
||||
.open
|
||||
.as_ref()
|
||||
.map(|w| now.duration_since(w.created_at) >= CLOSED_WINDOW_PERIOD)
|
||||
.map(|w| now.duration_since(w.created_at) >= self.closed_window_period)
|
||||
.unwrap_or(false);
|
||||
|
||||
if rotate {
|
||||
|
@ -291,7 +279,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn starts_open_window() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(60)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(60));
|
||||
|
||||
let i = Instant::now();
|
||||
let start_time = Utc::now();
|
||||
|
@ -340,7 +328,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn closes_open_window() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(60)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(60));
|
||||
let created_at = Instant::now();
|
||||
let start_time = Utc::now();
|
||||
let last_time = Utc::now();
|
||||
|
@ -359,7 +347,9 @@ mod tests {
|
|||
last_time,
|
||||
Instant::now(),
|
||||
);
|
||||
let after_close_threshold = created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let after_close_threshold = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let open_time = Utc::now();
|
||||
w.add_range(
|
||||
&Sequence { id: 1, number: 6 },
|
||||
|
@ -392,7 +382,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn moves_to_persistable() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120));
|
||||
let created_at = Instant::now();
|
||||
let start_time = Utc::now();
|
||||
|
||||
|
@ -405,7 +395,9 @@ mod tests {
|
|||
created_at,
|
||||
);
|
||||
|
||||
let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let second_created_at = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let second_end = Utc::now();
|
||||
w.add_range(
|
||||
&Sequence { id: 1, number: 3 },
|
||||
|
@ -415,7 +407,9 @@ mod tests {
|
|||
second_created_at,
|
||||
);
|
||||
|
||||
let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let third_created_at = second_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let third_end = Utc::now();
|
||||
w.add_range(
|
||||
&Sequence { id: 1, number: 4 },
|
||||
|
@ -446,7 +440,7 @@ mod tests {
|
|||
assert_eq!(c.max_time, third_end);
|
||||
|
||||
let fourth_created_at = third_created_at
|
||||
.checked_add(CLOSED_WINDOW_PERIOD * 3)
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3)
|
||||
.unwrap();
|
||||
let fourth_end = Utc::now();
|
||||
w.add_range(
|
||||
|
@ -475,7 +469,7 @@ mod tests {
|
|||
assert_eq!(c.max_time, third_end);
|
||||
|
||||
let fifth_created_at = fourth_created_at
|
||||
.checked_add(CLOSED_WINDOW_PERIOD * 100)
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100)
|
||||
.unwrap();
|
||||
w.add_range(
|
||||
&Sequence { id: 1, number: 9 },
|
||||
|
@ -497,14 +491,20 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn flush_persistable_keeps_open_and_closed() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120));
|
||||
|
||||
// these instants represent when the server received the data. Here we have a window that
|
||||
// should be in the persistable group, a closed window, and an open window that is closed on flush.
|
||||
let created_at = Instant::now();
|
||||
let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 2).unwrap();
|
||||
let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let end_at = third_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let second_created_at = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2)
|
||||
.unwrap();
|
||||
let third_created_at = second_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let end_at = third_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
|
||||
// these times represent the value of the time column for the rows of data. Here we have
|
||||
// non-overlapping windows.
|
||||
|
@ -569,14 +569,20 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn flush_persistable_overlaps_closed() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120));
|
||||
|
||||
// these instants represent when data is received by the server. Here we have a persistable
|
||||
// window followed by two closed windows.
|
||||
let created_at = Instant::now();
|
||||
let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 2).unwrap();
|
||||
let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let end_at = third_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let second_created_at = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2)
|
||||
.unwrap();
|
||||
let third_created_at = second_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let end_at = third_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
|
||||
// the times of the rows of data. this will create overlapping windows where persistable
|
||||
// overlaps with the oldest closed window.
|
||||
|
@ -642,13 +648,17 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn flush_persistable_overlaps_open() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120));
|
||||
|
||||
// these instants represent when data is received by the server. Here we have a persistable
|
||||
// window followed by two closed windows.
|
||||
let created_at = Instant::now();
|
||||
let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 3).unwrap();
|
||||
let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let second_created_at = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3)
|
||||
.unwrap();
|
||||
let third_created_at = second_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap();
|
||||
|
||||
// the times of the rows of data. this will create overlapping windows where persistable
|
||||
|
@ -715,13 +725,17 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn flush_persistable_overlaps_open_and_closed() {
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120)).unwrap();
|
||||
let mut w = PersistenceWindows::new(Duration::from_secs(120));
|
||||
|
||||
// these instants represent when data is received by the server. Here we have a persistable
|
||||
// window followed by two closed windows.
|
||||
let created_at = Instant::now();
|
||||
let second_created_at = created_at.checked_add(CLOSED_WINDOW_PERIOD * 3).unwrap();
|
||||
let third_created_at = second_created_at.checked_add(CLOSED_WINDOW_PERIOD).unwrap();
|
||||
let second_created_at = created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3)
|
||||
.unwrap();
|
||||
let third_created_at = second_created_at
|
||||
.checked_add(DEFAULT_CLOSED_WINDOW_PERIOD)
|
||||
.unwrap();
|
||||
let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap();
|
||||
|
||||
// the times of the rows of data. this will create overlapping windows where persistable
|
||||
|
|
|
@ -171,11 +171,6 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("error finding min/max time on table batch: {}", source))]
|
||||
TableBatchTimeError { source: entry::Error },
|
||||
|
||||
#[snafu(display("error creating persistence windows: {}", source))]
|
||||
PersistenceWindowsError {
|
||||
source: mutable_buffer::persistence_windows::Error,
|
||||
},
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
|
@ -1032,8 +1027,7 @@ impl Db {
|
|||
);
|
||||
}
|
||||
None => {
|
||||
let mut windows = PersistenceWindows::new(late_arrival_window)
|
||||
.context(PersistenceWindowsError)?;
|
||||
let mut windows = PersistenceWindows::new(late_arrival_window);
|
||||
windows.add_range(
|
||||
sequenced_entry.as_ref().sequence(),
|
||||
row_count,
|
||||
|
|
Loading…
Reference in New Issue