diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 5b66a593f8..957d034d06 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -499,6 +499,10 @@ impl Window { /// Add one window to another. Used to collapse closed windows into persisted. fn add_window(&mut self, other: Self) { + assert!(self.last_instant <= other.created_at); + assert!(self.last_instant <= other.last_instant); + + self.last_instant = other.last_instant; self.row_count += other.row_count; if self.min_time > other.min_time { self.min_time = other.min_time; @@ -1327,7 +1331,8 @@ mod tests { #[test] fn test_summaries() { - let mut w = make_windows(Duration::from_secs(100)); + let late_arrival_period = Duration::from_secs(100); + let mut w = make_windows(late_arrival_period); let instant = w.created_at_instant; // Window 1 @@ -1407,6 +1412,34 @@ mod tests { row_count: 8 }, ] - ) + ); + + // Rotate first and second windows into persistable + w.rotate(instant + late_arrival_period + DEFAULT_CLOSED_WINDOW_PERIOD * 2); + + let summaries: Vec<_> = w.summaries().collect(); + + assert_eq!(summaries.len(), 2); + assert_eq!( + summaries, + vec![ + WriteSummary { + time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1), + time_of_last_write: w.created_at_time + + closed_duration + + chrono::Duration::milliseconds(1), + min_timestamp: Utc.timestamp_nanos(1), + max_timestamp: Utc.timestamp_nanos(340), + row_count: 24 + }, + WriteSummary { + time_of_first_write: w.created_at_time + closed_duration * 3, + time_of_last_write: w.created_at_time + closed_duration * 3, + min_timestamp: Utc.timestamp_nanos(3), + max_timestamp: Utc.timestamp_nanos(4), + row_count: 8 + }, + ] + ); } }