diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index fb2f4fcda2..78535360d4 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -1,6 +1,7 @@ //! In memory structures for tracking data ingest and when persistence can or should occur. use std::{ collections::{btree_map::Entry, BTreeMap, VecDeque}, + num::NonZeroUsize, sync::Arc, time::{Duration, Instant}, }; @@ -147,7 +148,7 @@ impl PersistenceWindows { pub fn add_range( &mut self, sequence: Option<&Sequence>, - row_count: usize, + row_count: NonZeroUsize, min_time: DateTime, max_time: DateTime, received_at: Instant, @@ -318,14 +319,13 @@ impl PersistenceWindows { for w in self.closed.iter_mut().take(closed_count) { if w.min_time < new_min { w.min_time = new_min; - if w.max_time < new_min { - w.row_count = 0; - } } } // Drop any now empty windows - self.closed.retain(|x| x.row_count > 0); + let mut tail = self.closed.split_off(closed_count); + self.closed.retain(|w| w.max_time >= new_min); + self.closed.append(&mut tail); } /// Returns an iterator over the windows starting with the oldest @@ -353,7 +353,7 @@ impl PersistenceWindows { time_of_last_write: to_approximate_datetime(window.last_instant), min_timestamp: window.min_time, max_timestamp: window.max_time, - row_count: window.row_count, + row_count: window.row_count.get(), }) } @@ -386,7 +386,7 @@ impl PersistenceWindows { pub fn persistable_row_count(&self, now: Instant) -> usize { self.windows() .take_while(|window| window.is_persistable(now, self.late_arrival_period)) - .map(|window| window.row_count) + .map(|window| window.row_count.get()) .sum() } } @@ -399,7 +399,7 @@ struct Window { /// The server time of the last write to this window last_instant: Instant, /// The number of rows in the window - row_count: usize, + row_count: NonZeroUsize, /// min time value for data in the window min_time: DateTime, /// max time value for data in the window @@ -412,7 +412,7 @@ impl Window { fn new( created_at: Instant, sequence: Option<&Sequence>, - row_count: usize, + row_count: NonZeroUsize, min_time: DateTime, max_time: DateTime, ) -> Self { @@ -439,7 +439,7 @@ impl Window { fn add_range( &mut self, sequence: Option<&Sequence>, - row_count: usize, + row_count: NonZeroUsize, min_time: DateTime, max_time: DateTime, instant: Instant, @@ -447,7 +447,8 @@ impl Window { assert!(self.created_at <= instant); self.last_instant = instant; - self.row_count += row_count; + self.row_count = + NonZeroUsize::new(self.row_count.get() + row_count.get()).expect("both are > 0"); if self.min_time > min_time { self.min_time = min_time; } @@ -476,7 +477,8 @@ impl Window { assert!(self.last_instant <= other.last_instant); self.last_instant = other.last_instant; - self.row_count += other.row_count; + self.row_count = + NonZeroUsize::new(self.row_count.get() + other.row_count.get()).expect("both are > 0"); if self.min_time > other.min_time { self.min_time = other.min_time; } @@ -535,7 +537,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 1 }), - 1, + NonZeroUsize::new(1).unwrap(), Utc::now(), Utc::now(), now + Duration::from_nanos(1), @@ -543,7 +545,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 1, + NonZeroUsize::new(1).unwrap(), Utc::now(), Utc::now(), now, @@ -559,21 +561,21 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 1, + NonZeroUsize::new(1).unwrap(), start_time, Utc::now(), i, ); w.add_range( Some(&Sequence { id: 1, number: 4 }), - 2, + NonZeroUsize::new(2).unwrap(), Utc::now(), Utc::now(), Instant::now(), ); w.add_range( Some(&Sequence { id: 1, number: 10 }), - 1, + NonZeroUsize::new(1).unwrap(), Utc::now(), Utc::now(), Instant::now(), @@ -581,7 +583,7 @@ mod tests { let last_time = Utc::now(); w.add_range( Some(&Sequence { id: 2, number: 23 }), - 10, + NonZeroUsize::new(10).unwrap(), Utc::now(), last_time, Instant::now(), @@ -593,7 +595,7 @@ mod tests { assert_eq!(open.min_time, start_time); assert_eq!(open.max_time, last_time); - assert_eq!(open.row_count, 14); + assert_eq!(open.row_count.get(), 14); assert_eq!( open.sequencer_numbers.get(&1).unwrap(), &MinMaxSequence::new(2, 10) @@ -613,14 +615,14 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 1, + NonZeroUsize::new(1).unwrap(), start_time, start_time, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 1, + NonZeroUsize::new(1).unwrap(), last_time, last_time, Instant::now(), @@ -631,7 +633,7 @@ mod tests { let open_time = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 6 }), - 2, + NonZeroUsize::new(2).unwrap(), last_time, open_time, after_close_threshold, @@ -644,12 +646,12 @@ mod tests { closed.sequencer_numbers.get(&1).unwrap(), &MinMaxSequence::new(2, 3) ); - assert_eq!(closed.row_count, 2); + assert_eq!(closed.row_count.get(), 2); assert_eq!(closed.min_time, start_time); assert_eq!(closed.max_time, last_time); let open = w.open.unwrap(); - assert_eq!(open.row_count, 2); + assert_eq!(open.row_count.get(), 2); assert_eq!(open.min_time, last_time); assert_eq!(open.max_time, open_time); assert_eq!( @@ -667,7 +669,7 @@ mod tests { let first_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start_time, first_end, created_at, @@ -679,7 +681,7 @@ mod tests { let second_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 3, + NonZeroUsize::new(3).unwrap(), first_end, second_end, second_created_at, @@ -691,7 +693,7 @@ mod tests { let third_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 4 }), - 4, + NonZeroUsize::new(4).unwrap(), second_end, third_end, third_created_at, @@ -701,19 +703,19 @@ mod tests { // confirm the two on closed and third on open let c = w.closed.get(0).cloned().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); let c = w.closed.get(1).cloned().unwrap(); assert_eq!(c.created_at, second_created_at); - assert_eq!(c.row_count, 3); + assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, first_end); assert_eq!(c.max_time, second_end); let c = w.open.clone().unwrap(); assert_eq!(c.created_at, third_created_at); - assert_eq!(c.row_count, 4); + assert_eq!(c.row_count.get(), 4); assert_eq!(c.min_time, second_end); assert_eq!(c.max_time, third_end); @@ -723,7 +725,7 @@ mod tests { let fourth_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 5 }), - 1, + NonZeroUsize::new(1).unwrap(), fourth_end, fourth_end, fourth_created_at, @@ -732,14 +734,14 @@ mod tests { // confirm persistable has first and second let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 5); + assert_eq!(c.row_count.get(), 5); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, second_end); // and the third window moved to closed let c = w.closed.get(0).cloned().unwrap(); assert_eq!(c.created_at, third_created_at); - assert_eq!(c.row_count, 4); + assert_eq!(c.row_count.get(), 4); assert_eq!(c.min_time, second_end); assert_eq!(c.max_time, third_end); @@ -748,7 +750,7 @@ mod tests { .unwrap(); w.add_range( Some(&Sequence { id: 1, number: 9 }), - 2, + NonZeroUsize::new(2).unwrap(), Utc::now(), Utc::now(), fifth_created_at, @@ -756,7 +758,7 @@ mod tests { let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 10); + assert_eq!(c.row_count.get(), 10); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, fourth_end); } @@ -789,21 +791,21 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start_time, first_end, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 3, + NonZeroUsize::new(3).unwrap(), second_start, second_end, second_created_at, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), - 2, + NonZeroUsize::new(2).unwrap(), third_start, third_end, third_created_at, @@ -813,7 +815,7 @@ mod tests { let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -828,13 +830,13 @@ mod tests { assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); let c = &w.closed[0]; - assert_eq!(c.row_count, 3); + assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, second_start); assert_eq!(c.max_time, second_end); assert_eq!(c.created_at, second_created_at); let c = &w.closed[1]; - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, third_start); assert_eq!(c.max_time, third_end); assert_eq!(c.created_at, third_created_at); @@ -868,21 +870,21 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start_time, first_end, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 3, + NonZeroUsize::new(3).unwrap(), second_start, second_end, second_created_at, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), - 2, + NonZeroUsize::new(2).unwrap(), third_start, third_end, third_created_at, @@ -892,7 +894,7 @@ mod tests { let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -910,13 +912,13 @@ mod tests { // the first closed window should have a min time truncated by the flush let c = &w.closed[0]; - assert_eq!(c.row_count, 3); + assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, truncated_time); assert_eq!(c.max_time, second_end); assert_eq!(c.created_at, second_created_at); let c = &w.closed[1]; - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, third_start); assert_eq!(c.max_time, third_end); assert_eq!(c.created_at, third_created_at); @@ -947,21 +949,21 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start_time, first_end, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 3, + NonZeroUsize::new(3).unwrap(), first_end, second_end, second_created_at, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), - 2, + NonZeroUsize::new(2).unwrap(), third_start, third_end, third_created_at, @@ -971,7 +973,7 @@ mod tests { let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -993,7 +995,7 @@ mod tests { // the closed window should have a min time equal to the flush let c = &w.closed[0]; - assert_eq!(c.row_count, 3); + assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, second_end); assert_eq!(c.created_at, second_created_at); @@ -1001,7 +1003,7 @@ mod tests { // the open window should have been closed as part of creating the flush // handle and then truncated by the flush timestamp let c = &w.closed[1]; - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, third_end); assert_eq!(c.created_at, third_created_at); @@ -1033,21 +1035,21 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start_time, first_end, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), - 3, + NonZeroUsize::new(3).unwrap(), second_start, second_end, second_created_at, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), - 2, + NonZeroUsize::new(2).unwrap(), third_start, third_end, third_created_at, @@ -1055,7 +1057,7 @@ mod tests { let c = w.persistable.as_ref().unwrap(); assert_eq!(c.created_at, created_at); - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -1078,7 +1080,7 @@ mod tests { // the closed window should have a min time equal to the flush let c = &w.closed[0]; - assert_eq!(c.row_count, 3); + assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, second_end); assert_eq!(c.created_at, second_created_at); @@ -1086,7 +1088,7 @@ mod tests { // the open window should have been closed as part of creating the flush // handle and then truncated by the flush timestamp let c = &w.closed[1]; - assert_eq!(c.row_count, 2); + assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, third_end); assert_eq!(c.created_at, third_created_at); @@ -1101,7 +1103,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start, start + chrono::Duration::seconds(2), instant, @@ -1109,7 +1111,7 @@ mod tests { w.rotate(instant + Duration::from_secs(120)); assert!(w.persistable.is_some()); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); assert_eq!( w.persistable.as_ref().unwrap().max_time, start + chrono::Duration::seconds(2) @@ -1117,7 +1119,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 4 }), - 5, + NonZeroUsize::new(5).unwrap(), start, start + chrono::Duration::seconds(4), instant + Duration::from_secs(120), @@ -1136,7 +1138,7 @@ mod tests { // This should not rotate into persistable as active flush guard w.rotate(instant + Duration::from_secs(240)); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); let flush_t = guard.timestamp(); assert_eq!(flush_t, start + chrono::Duration::seconds(2)); @@ -1174,7 +1176,7 @@ mod tests { // This should rotate into persistable w.rotate(instant + Duration::from_secs(240)); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 5); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5); assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time); @@ -1189,7 +1191,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 9 }), - 9, + NonZeroUsize::new(9).unwrap(), start, start + chrono::Duration::seconds(2), instant + Duration::from_secs(240), @@ -1201,12 +1203,12 @@ mod tests { // This should not rotate into persistable as active flush guard w.rotate(instant + Duration::from_secs(360)); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 5); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5); std::mem::drop(guard); // This should rotate into persistable w.rotate(instant + Duration::from_secs(360)); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 5 + 9); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5 + 9); assert_eq!(w.persistable.as_ref().unwrap().min_time, start); } @@ -1219,7 +1221,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 2, + NonZeroUsize::new(2).unwrap(), start, start + chrono::Duration::seconds(2), instant, @@ -1227,7 +1229,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 6 }), - 5, + NonZeroUsize::new(5).unwrap(), start, start + chrono::Duration::seconds(4), instant + DEFAULT_CLOSED_WINDOW_PERIOD, @@ -1235,7 +1237,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 9 }), - 9, + NonZeroUsize::new(9).unwrap(), start, start + chrono::Duration::seconds(2), instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2, @@ -1243,16 +1245,16 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 10 }), - 17, + NonZeroUsize::new(17).unwrap(), start, start + chrono::Duration::seconds(2), instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3, ); assert_eq!(w.closed.len(), 2); - assert_eq!(w.closed[0].row_count, 5); - assert_eq!(w.closed[1].row_count, 9); - assert_eq!(w.open.as_ref().unwrap().row_count, 17); + assert_eq!(w.closed[0].row_count.get(), 5); + assert_eq!(w.closed[1].row_count.get(), 9); + assert_eq!(w.open.as_ref().unwrap().row_count.get(), 17); let flush = w .flush_handle(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3) @@ -1265,11 +1267,11 @@ mod tests { assert_eq!(flush_t, start + chrono::Duration::seconds(2)); let truncated_time = flush_t + chrono::Duration::nanoseconds(1); - assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); w.add_range( Some(&Sequence { id: 1, number: 14 }), - 11, + NonZeroUsize::new(11).unwrap(), start, start + chrono::Duration::seconds(2), instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4, @@ -1278,7 +1280,7 @@ mod tests { w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5); // Despite time passing persistable window shouldn't have changed due to flush guard - assert_eq!(w.persistable.as_ref().unwrap().row_count, 2); + assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); assert_eq!(w.closed.len(), 4); // The flush checkpoint should not include the latest write nor those being persisted @@ -1321,7 +1323,7 @@ mod tests { ); assert_eq!(w.closed[0].min_time, truncated_time); assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4)); - assert_eq!(w.closed[0].row_count, 5); + assert_eq!(w.closed[0].row_count.get(), 5); // Window created after flush handle - should be left alone assert_eq!( @@ -1330,7 +1332,7 @@ mod tests { ); assert_eq!(w.closed[1].min_time, start); assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2)); - assert_eq!(w.closed[1].row_count, 11); + assert_eq!(w.closed[1].row_count.get(), 11); } #[test] @@ -1343,7 +1345,7 @@ mod tests { // Window 1 w.add_range( Some(&Sequence { id: 1, number: 1 }), - 11, + NonZeroUsize::new(11).unwrap(), Utc.timestamp_nanos(10), Utc.timestamp_nanos(11), instant + Duration::from_millis(1), @@ -1351,7 +1353,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 2 }), - 4, + NonZeroUsize::new(4).unwrap(), Utc.timestamp_nanos(10), Utc.timestamp_nanos(340), instant + Duration::from_millis(30), @@ -1359,7 +1361,7 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 3 }), - 6, + NonZeroUsize::new(6).unwrap(), Utc.timestamp_nanos(1), Utc.timestamp_nanos(5), instant + Duration::from_millis(50), @@ -1368,7 +1370,7 @@ mod tests { // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2 w.add_range( Some(&Sequence { id: 1, number: 4 }), - 3, + NonZeroUsize::new(3).unwrap(), Utc.timestamp_nanos(89), Utc.timestamp_nanos(90), instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1), @@ -1377,7 +1379,7 @@ mod tests { // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3 w.add_range( Some(&Sequence { id: 1, number: 5 }), - 8, + NonZeroUsize::new(8).unwrap(), Utc.timestamp_nanos(3), Utc.timestamp_nanos(4), instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3, diff --git a/server/src/db.rs b/server/src/db.rs index eadd80cad2..95a038d6c5 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -44,6 +44,7 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ any::Any, collections::HashMap, + num::NonZeroUsize, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -1184,11 +1185,11 @@ impl Db { for write in partitioned_writes { let partition_key = write.key(); for table_batch in write.table_batches() { - let row_count = table_batch.row_count(); + let row_count = match NonZeroUsize::new(table_batch.row_count()) { + Some(row_count) => row_count, + None => continue, + }; - if row_count == 0 { - continue; - } if !filter_table_batch(sequence, partition_key, &table_batch) { continue; } @@ -1227,7 +1228,7 @@ impl Db { // At this point this should not be possible ensure!( - timestamp_summary.stats.total_count == row_count as u64, + timestamp_summary.stats.total_count == row_count.get() as u64, TableBatchMissingTimes {} );