refactor: persistence windows row counts are non-zero

pull/24376/head
Marco Neumann 2021-08-09 10:00:28 +02:00
parent 8f06652308
commit 57bbae7e34
2 changed files with 92 additions and 89 deletions

View File

@ -1,6 +1,7 @@
//! In memory structures for tracking data ingest and when persistence can or should occur.
use std::{
collections::{btree_map::Entry, BTreeMap, VecDeque},
num::NonZeroUsize,
sync::Arc,
time::{Duration, Instant},
};
@ -147,7 +148,7 @@ impl PersistenceWindows {
pub fn add_range(
&mut self,
sequence: Option<&Sequence>,
row_count: usize,
row_count: NonZeroUsize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
received_at: Instant,
@ -318,14 +319,13 @@ impl PersistenceWindows {
for w in self.closed.iter_mut().take(closed_count) {
if w.min_time < new_min {
w.min_time = new_min;
if w.max_time < new_min {
w.row_count = 0;
}
}
}
// Drop any now empty windows
self.closed.retain(|x| x.row_count > 0);
let mut tail = self.closed.split_off(closed_count);
self.closed.retain(|w| w.max_time >= new_min);
self.closed.append(&mut tail);
}
/// Returns an iterator over the windows starting with the oldest
@ -353,7 +353,7 @@ impl PersistenceWindows {
time_of_last_write: to_approximate_datetime(window.last_instant),
min_timestamp: window.min_time,
max_timestamp: window.max_time,
row_count: window.row_count,
row_count: window.row_count.get(),
})
}
@ -386,7 +386,7 @@ impl PersistenceWindows {
pub fn persistable_row_count(&self, now: Instant) -> usize {
self.windows()
.take_while(|window| window.is_persistable(now, self.late_arrival_period))
.map(|window| window.row_count)
.map(|window| window.row_count.get())
.sum()
}
}
@ -399,7 +399,7 @@ struct Window {
/// The server time of the last write to this window
last_instant: Instant,
/// The number of rows in the window
row_count: usize,
row_count: NonZeroUsize,
/// min time value for data in the window
min_time: DateTime<Utc>,
/// max time value for data in the window
@ -412,7 +412,7 @@ impl Window {
fn new(
created_at: Instant,
sequence: Option<&Sequence>,
row_count: usize,
row_count: NonZeroUsize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
) -> Self {
@ -439,7 +439,7 @@ impl Window {
fn add_range(
&mut self,
sequence: Option<&Sequence>,
row_count: usize,
row_count: NonZeroUsize,
min_time: DateTime<Utc>,
max_time: DateTime<Utc>,
instant: Instant,
@ -447,7 +447,8 @@ impl Window {
assert!(self.created_at <= instant);
self.last_instant = instant;
self.row_count += row_count;
self.row_count =
NonZeroUsize::new(self.row_count.get() + row_count.get()).expect("both are > 0");
if self.min_time > min_time {
self.min_time = min_time;
}
@ -476,7 +477,8 @@ impl Window {
assert!(self.last_instant <= other.last_instant);
self.last_instant = other.last_instant;
self.row_count += other.row_count;
self.row_count =
NonZeroUsize::new(self.row_count.get() + other.row_count.get()).expect("both are > 0");
if self.min_time > other.min_time {
self.min_time = other.min_time;
}
@ -535,7 +537,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 1 }),
1,
NonZeroUsize::new(1).unwrap(),
Utc::now(),
Utc::now(),
now + Duration::from_nanos(1),
@ -543,7 +545,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
1,
NonZeroUsize::new(1).unwrap(),
Utc::now(),
Utc::now(),
now,
@ -559,21 +561,21 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
1,
NonZeroUsize::new(1).unwrap(),
start_time,
Utc::now(),
i,
);
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
2,
NonZeroUsize::new(2).unwrap(),
Utc::now(),
Utc::now(),
Instant::now(),
);
w.add_range(
Some(&Sequence { id: 1, number: 10 }),
1,
NonZeroUsize::new(1).unwrap(),
Utc::now(),
Utc::now(),
Instant::now(),
@ -581,7 +583,7 @@ mod tests {
let last_time = Utc::now();
w.add_range(
Some(&Sequence { id: 2, number: 23 }),
10,
NonZeroUsize::new(10).unwrap(),
Utc::now(),
last_time,
Instant::now(),
@ -593,7 +595,7 @@ mod tests {
assert_eq!(open.min_time, start_time);
assert_eq!(open.max_time, last_time);
assert_eq!(open.row_count, 14);
assert_eq!(open.row_count.get(), 14);
assert_eq!(
open.sequencer_numbers.get(&1).unwrap(),
&MinMaxSequence::new(2, 10)
@ -613,14 +615,14 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
1,
NonZeroUsize::new(1).unwrap(),
start_time,
start_time,
created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
1,
NonZeroUsize::new(1).unwrap(),
last_time,
last_time,
Instant::now(),
@ -631,7 +633,7 @@ mod tests {
let open_time = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 6 }),
2,
NonZeroUsize::new(2).unwrap(),
last_time,
open_time,
after_close_threshold,
@ -644,12 +646,12 @@ mod tests {
closed.sequencer_numbers.get(&1).unwrap(),
&MinMaxSequence::new(2, 3)
);
assert_eq!(closed.row_count, 2);
assert_eq!(closed.row_count.get(), 2);
assert_eq!(closed.min_time, start_time);
assert_eq!(closed.max_time, last_time);
let open = w.open.unwrap();
assert_eq!(open.row_count, 2);
assert_eq!(open.row_count.get(), 2);
assert_eq!(open.min_time, last_time);
assert_eq!(open.max_time, open_time);
assert_eq!(
@ -667,7 +669,7 @@ mod tests {
let first_end = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start_time,
first_end,
created_at,
@ -679,7 +681,7 @@ mod tests {
let second_end = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
3,
NonZeroUsize::new(3).unwrap(),
first_end,
second_end,
second_created_at,
@ -691,7 +693,7 @@ mod tests {
let third_end = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
4,
NonZeroUsize::new(4).unwrap(),
second_end,
third_end,
third_created_at,
@ -701,19 +703,19 @@ mod tests {
// confirm the two on closed and third on open
let c = w.closed.get(0).cloned().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, first_end);
let c = w.closed.get(1).cloned().unwrap();
assert_eq!(c.created_at, second_created_at);
assert_eq!(c.row_count, 3);
assert_eq!(c.row_count.get(), 3);
assert_eq!(c.min_time, first_end);
assert_eq!(c.max_time, second_end);
let c = w.open.clone().unwrap();
assert_eq!(c.created_at, third_created_at);
assert_eq!(c.row_count, 4);
assert_eq!(c.row_count.get(), 4);
assert_eq!(c.min_time, second_end);
assert_eq!(c.max_time, third_end);
@ -723,7 +725,7 @@ mod tests {
let fourth_end = Utc::now();
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
1,
NonZeroUsize::new(1).unwrap(),
fourth_end,
fourth_end,
fourth_created_at,
@ -732,14 +734,14 @@ mod tests {
// confirm persistable has first and second
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 5);
assert_eq!(c.row_count.get(), 5);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, second_end);
// and the third window moved to closed
let c = w.closed.get(0).cloned().unwrap();
assert_eq!(c.created_at, third_created_at);
assert_eq!(c.row_count, 4);
assert_eq!(c.row_count.get(), 4);
assert_eq!(c.min_time, second_end);
assert_eq!(c.max_time, third_end);
@ -748,7 +750,7 @@ mod tests {
.unwrap();
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
2,
NonZeroUsize::new(2).unwrap(),
Utc::now(),
Utc::now(),
fifth_created_at,
@ -756,7 +758,7 @@ mod tests {
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 10);
assert_eq!(c.row_count.get(), 10);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, fourth_end);
}
@ -789,21 +791,21 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start_time,
first_end,
created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
3,
NonZeroUsize::new(3).unwrap(),
second_start,
second_end,
second_created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
2,
NonZeroUsize::new(2).unwrap(),
third_start,
third_end,
third_created_at,
@ -813,7 +815,7 @@ mod tests {
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, first_end);
@ -828,13 +830,13 @@ mod tests {
assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap());
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.row_count.get(), 3);
assert_eq!(c.min_time, second_start);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
let c = &w.closed[1];
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, third_start);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
@ -868,21 +870,21 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start_time,
first_end,
created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
3,
NonZeroUsize::new(3).unwrap(),
second_start,
second_end,
second_created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
2,
NonZeroUsize::new(2).unwrap(),
third_start,
third_end,
third_created_at,
@ -892,7 +894,7 @@ mod tests {
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, first_end);
@ -910,13 +912,13 @@ mod tests {
// the first closed window should have a min time truncated by the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.row_count.get(), 3);
assert_eq!(c.min_time, truncated_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
let c = &w.closed[1];
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, third_start);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
@ -947,21 +949,21 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start_time,
first_end,
created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
3,
NonZeroUsize::new(3).unwrap(),
first_end,
second_end,
second_created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
2,
NonZeroUsize::new(2).unwrap(),
third_start,
third_end,
third_created_at,
@ -971,7 +973,7 @@ mod tests {
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, first_end);
@ -993,7 +995,7 @@ mod tests {
// the closed window should have a min time equal to the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.row_count.get(), 3);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
@ -1001,7 +1003,7 @@ mod tests {
// the open window should have been closed as part of creating the flush
// handle and then truncated by the flush timestamp
let c = &w.closed[1];
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
@ -1033,21 +1035,21 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start_time,
first_end,
created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
3,
NonZeroUsize::new(3).unwrap(),
second_start,
second_end,
second_created_at,
);
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
2,
NonZeroUsize::new(2).unwrap(),
third_start,
third_end,
third_created_at,
@ -1055,7 +1057,7 @@ mod tests {
let c = w.persistable.as_ref().unwrap();
assert_eq!(c.created_at, created_at);
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, start_time);
assert_eq!(c.max_time, first_end);
@ -1078,7 +1080,7 @@ mod tests {
// the closed window should have a min time equal to the flush
let c = &w.closed[0];
assert_eq!(c.row_count, 3);
assert_eq!(c.row_count.get(), 3);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, second_end);
assert_eq!(c.created_at, second_created_at);
@ -1086,7 +1088,7 @@ mod tests {
// the open window should have been closed as part of creating the flush
// handle and then truncated by the flush timestamp
let c = &w.closed[1];
assert_eq!(c.row_count, 2);
assert_eq!(c.row_count.get(), 2);
assert_eq!(c.min_time, flushed_time);
assert_eq!(c.max_time, third_end);
assert_eq!(c.created_at, third_created_at);
@ -1101,7 +1103,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant,
@ -1109,7 +1111,7 @@ mod tests {
w.rotate(instant + Duration::from_secs(120));
assert!(w.persistable.is_some());
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2);
assert_eq!(
w.persistable.as_ref().unwrap().max_time,
start + chrono::Duration::seconds(2)
@ -1117,7 +1119,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
5,
NonZeroUsize::new(5).unwrap(),
start,
start + chrono::Duration::seconds(4),
instant + Duration::from_secs(120),
@ -1136,7 +1138,7 @@ mod tests {
// This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2);
let flush_t = guard.timestamp();
assert_eq!(flush_t, start + chrono::Duration::seconds(2));
@ -1174,7 +1176,7 @@ mod tests {
// This should rotate into persistable
w.rotate(instant + Duration::from_secs(240));
assert_eq!(w.persistable.as_ref().unwrap().row_count, 5);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5);
assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time);
@ -1189,7 +1191,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
9,
NonZeroUsize::new(9).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant + Duration::from_secs(240),
@ -1201,12 +1203,12 @@ mod tests {
// This should not rotate into persistable as active flush guard
w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable.as_ref().unwrap().row_count, 5);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5);
std::mem::drop(guard);
// This should rotate into persistable
w.rotate(instant + Duration::from_secs(360));
assert_eq!(w.persistable.as_ref().unwrap().row_count, 5 + 9);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5 + 9);
assert_eq!(w.persistable.as_ref().unwrap().min_time, start);
}
@ -1219,7 +1221,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
2,
NonZeroUsize::new(2).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant,
@ -1227,7 +1229,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 6 }),
5,
NonZeroUsize::new(5).unwrap(),
start,
start + chrono::Duration::seconds(4),
instant + DEFAULT_CLOSED_WINDOW_PERIOD,
@ -1235,7 +1237,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
9,
NonZeroUsize::new(9).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2,
@ -1243,16 +1245,16 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 10 }),
17,
NonZeroUsize::new(17).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3,
);
assert_eq!(w.closed.len(), 2);
assert_eq!(w.closed[0].row_count, 5);
assert_eq!(w.closed[1].row_count, 9);
assert_eq!(w.open.as_ref().unwrap().row_count, 17);
assert_eq!(w.closed[0].row_count.get(), 5);
assert_eq!(w.closed[1].row_count.get(), 9);
assert_eq!(w.open.as_ref().unwrap().row_count.get(), 17);
let flush = w
.flush_handle(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3)
@ -1265,11 +1267,11 @@ mod tests {
assert_eq!(flush_t, start + chrono::Duration::seconds(2));
let truncated_time = flush_t + chrono::Duration::nanoseconds(1);
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2);
w.add_range(
Some(&Sequence { id: 1, number: 14 }),
11,
NonZeroUsize::new(11).unwrap(),
start,
start + chrono::Duration::seconds(2),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4,
@ -1278,7 +1280,7 @@ mod tests {
w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5);
// Despite time passing persistable window shouldn't have changed due to flush guard
assert_eq!(w.persistable.as_ref().unwrap().row_count, 2);
assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2);
assert_eq!(w.closed.len(), 4);
// The flush checkpoint should not include the latest write nor those being persisted
@ -1321,7 +1323,7 @@ mod tests {
);
assert_eq!(w.closed[0].min_time, truncated_time);
assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4));
assert_eq!(w.closed[0].row_count, 5);
assert_eq!(w.closed[0].row_count.get(), 5);
// Window created after flush handle - should be left alone
assert_eq!(
@ -1330,7 +1332,7 @@ mod tests {
);
assert_eq!(w.closed[1].min_time, start);
assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2));
assert_eq!(w.closed[1].row_count, 11);
assert_eq!(w.closed[1].row_count.get(), 11);
}
#[test]
@ -1343,7 +1345,7 @@ mod tests {
// Window 1
w.add_range(
Some(&Sequence { id: 1, number: 1 }),
11,
NonZeroUsize::new(11).unwrap(),
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(11),
instant + Duration::from_millis(1),
@ -1351,7 +1353,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
4,
NonZeroUsize::new(4).unwrap(),
Utc.timestamp_nanos(10),
Utc.timestamp_nanos(340),
instant + Duration::from_millis(30),
@ -1359,7 +1361,7 @@ mod tests {
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
6,
NonZeroUsize::new(6).unwrap(),
Utc.timestamp_nanos(1),
Utc.timestamp_nanos(5),
instant + Duration::from_millis(50),
@ -1368,7 +1370,7 @@ mod tests {
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2
w.add_range(
Some(&Sequence { id: 1, number: 4 }),
3,
NonZeroUsize::new(3).unwrap(),
Utc.timestamp_nanos(89),
Utc.timestamp_nanos(90),
instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1),
@ -1377,7 +1379,7 @@ mod tests {
// More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3
w.add_range(
Some(&Sequence { id: 1, number: 5 }),
8,
NonZeroUsize::new(8).unwrap(),
Utc.timestamp_nanos(3),
Utc.timestamp_nanos(4),
instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3,

View File

@ -44,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
any::Any,
collections::HashMap,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
@ -1184,11 +1185,11 @@ impl Db {
for write in partitioned_writes {
let partition_key = write.key();
for table_batch in write.table_batches() {
let row_count = table_batch.row_count();
let row_count = match NonZeroUsize::new(table_batch.row_count()) {
Some(row_count) => row_count,
None => continue,
};
if row_count == 0 {
continue;
}
if !filter_table_batch(sequence, partition_key, &table_batch) {
continue;
}
@ -1227,7 +1228,7 @@ impl Db {
// At this point this should not be possible
ensure!(
timestamp_summary.stats.total_count == row_count as u64,
timestamp_summary.stats.total_count == row_count.get() as u64,
TableBatchMissingTimes {}
);