fix: update last_instant when rotating into persistable window (#2067)
parent
091837420f
commit
61da0fe4df
|
@ -499,6 +499,10 @@ impl Window {
|
|||
|
||||
/// Add one window to another. Used to collapse closed windows into persisted.
|
||||
fn add_window(&mut self, other: Self) {
|
||||
assert!(self.last_instant <= other.created_at);
|
||||
assert!(self.last_instant <= other.last_instant);
|
||||
|
||||
self.last_instant = other.last_instant;
|
||||
self.row_count += other.row_count;
|
||||
if self.min_time > other.min_time {
|
||||
self.min_time = other.min_time;
|
||||
|
@ -1327,7 +1331,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_summaries() {
|
||||
let mut w = make_windows(Duration::from_secs(100));
|
||||
let late_arrival_period = Duration::from_secs(100);
|
||||
let mut w = make_windows(late_arrival_period);
|
||||
let instant = w.created_at_instant;
|
||||
|
||||
// Window 1
|
||||
|
@ -1407,6 +1412,34 @@ mod tests {
|
|||
row_count: 8
|
||||
},
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
// Rotate first and second windows into persistable
|
||||
w.rotate(instant + late_arrival_period + DEFAULT_CLOSED_WINDOW_PERIOD * 2);
|
||||
|
||||
let summaries: Vec<_> = w.summaries().collect();
|
||||
|
||||
assert_eq!(summaries.len(), 2);
|
||||
assert_eq!(
|
||||
summaries,
|
||||
vec![
|
||||
WriteSummary {
|
||||
time_of_first_write: w.created_at_time + chrono::Duration::milliseconds(1),
|
||||
time_of_last_write: w.created_at_time
|
||||
+ closed_duration
|
||||
+ chrono::Duration::milliseconds(1),
|
||||
min_timestamp: Utc.timestamp_nanos(1),
|
||||
max_timestamp: Utc.timestamp_nanos(340),
|
||||
row_count: 24
|
||||
},
|
||||
WriteSummary {
|
||||
time_of_first_write: w.created_at_time + closed_duration * 3,
|
||||
time_of_last_write: w.created_at_time + closed_duration * 3,
|
||||
min_timestamp: Utc.timestamp_nanos(3),
|
||||
max_timestamp: Utc.timestamp_nanos(4),
|
||||
row_count: 8
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue