diff --git a/Cargo.lock b/Cargo.lock index 8ea4c65dcd..7d0107e218 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3174,6 +3174,7 @@ dependencies = [ "arrow", "arrow_util", "async-trait", + "chrono", "data_types", "datafusion 0.1.0", "internal_types", diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml index 919031a441..258251c346 100644 --- a/lifecycle/Cargo.toml +++ b/lifecycle/Cargo.toml @@ -14,6 +14,6 @@ internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } tokio = { version = "1.11", features = ["macros", "time"] } tracker = { path = "../tracker" } -[dev-dependencies] +[dev-dependencies] tokio = { version = "1.11", features = ["macros", "time", "rt"] } diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index b8dc7f831f..8907c87551 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -15,7 +15,6 @@ use data_types::{ DatabaseName, }; use internal_types::access::AccessMetrics; -use std::time::Instant; use tracker::TaskTracker; mod guard; @@ -86,7 +85,7 @@ pub trait LockablePartition: Sized + std::fmt::Display { /// write has been present in memory fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - now: Instant, + now: DateTime, ) -> Option; /// Split and persist chunks. @@ -158,10 +157,10 @@ pub trait LifecyclePartition { /// /// `now` is the wall clock time that should be used to compute how long a given /// write has been present in memory - fn persistable_row_count(&self, now: Instant) -> usize; + fn persistable_row_count(&self, now: DateTime) -> usize; /// Returns the age of the oldest unpersisted write - fn minimum_unpersisted_age(&self) -> Option; + fn minimum_unpersisted_age(&self) -> Option>; } /// The lifecycle operates on chunks implementing this trait diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index e96d789028..3f8c248ce3 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -82,8 +82,6 @@ where /// - If persist is `true` it will only unload persisted chunks in order of creation time, starting with the oldest. /// - If persist is `false` it will consider all chunks, also in order of creation time, starting with the oldest. /// - /// TODO: use LRU instead of creation time - /// fn maybe_free_memory( &mut self, db_name: &DatabaseName<'static>, @@ -343,7 +341,7 @@ where db_name: &DatabaseName<'static>, partition: &P, rules: &LifecycleRules, - now: Instant, + now: DateTime, ) -> bool { // TODO: Encapsulate locking into a CatalogTransaction type let partition = partition.read(); @@ -353,19 +351,15 @@ where return false; } - let persistable_age_seconds = partition + let persistable_age_seconds: u32 = partition .minimum_unpersisted_age() .and_then(|minimum_unpersisted_age| { - // If writes happened between when the policy loop - // started and this check is done, the duration may be - // negative. Skip persistence in this case to avoid - // panic in `duration_since` - Some( - now.checked_duration_since(minimum_unpersisted_age)? - .as_secs(), - ) + (now - minimum_unpersisted_age) + .num_seconds() + .try_into() + .ok() }) - .unwrap_or_default() as u32; + .unwrap_or_default(); let persistable_row_count = partition.persistable_row_count(now); debug!(%db_name, %partition, @@ -512,8 +506,7 @@ where // but persistence cannot proceed because of in-progress // compactions let stall_compaction_persisting = if rules.persist && !self.suppress_persistence { - let persisting = - self.maybe_persist_chunks(&db_name, partition, &rules, now_instant); + let persisting = self.maybe_persist_chunks(&db_name, partition, &rules, now); if persisting { debug!(%db_name, %partition, reason="persisting", "stalling compaction"); } @@ -672,6 +665,7 @@ mod tests { ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, PersistHandle, }; + use chrono::TimeZone; use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder, ChunkStorage}; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; use std::{ @@ -695,7 +689,7 @@ mod tests { struct TestPartition { chunks: BTreeMap>)>, persistable_row_count: usize, - minimum_unpersisted_age: Option, + minimum_unpersisted_age: Option>, max_persistable_timestamp: Option>, next_id: ChunkId, } @@ -704,7 +698,7 @@ mod tests { fn with_persistence( self, persistable_row_count: usize, - minimum_unpersisted_age: Instant, + minimum_unpersisted_age: DateTime, max_persistable_timestamp: DateTime, ) -> Self { Self { @@ -890,7 +884,7 @@ mod tests { fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - _now: Instant, + _now: DateTime, ) -> Option { Some(TestPersistHandle { timestamp: partition.max_persistable_timestamp.unwrap(), @@ -983,11 +977,11 @@ mod tests { false } - fn persistable_row_count(&self, _now: Instant) -> usize { + fn persistable_row_count(&self, _now: DateTime) -> usize { self.persistable_row_count } - fn minimum_unpersisted_age(&self) -> Option { + fn minimum_unpersisted_age(&self) -> Option> { self.minimum_unpersisted_age } } @@ -1112,7 +1106,7 @@ mod tests { } fn from_secs(secs: i64) -> DateTime { - DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc) + Utc.timestamp(secs, 0) } #[test] @@ -1619,7 +1613,8 @@ mod tests { max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()), ..Default::default() }; - let now = Instant::now(); + let now = from_secs(0); + let now_instant = Instant::now(); let partitions = vec![ // Insufficient rows and not old enough => don't persist but can compact @@ -1648,7 +1643,7 @@ mod tests { TestChunk::new(ChunkId::new(6), 0, ChunkStorage::ObjectStoreOnly) .with_min_timestamp(from_secs(5)), ]) - .with_persistence(10, now - Duration::from_secs(10), from_secs(20)), + .with_persistence(10, now - chrono::Duration::seconds(10), from_secs(20)), // Sufficient rows but conflicting compaction => prevent compaction TestPartition::new(vec![ TestChunk::new(ChunkId::new(7), 0, ChunkStorage::ClosedMutableBuffer) @@ -1690,7 +1685,7 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(0), now); + lifecycle.check_for_work(from_secs(0), now_instant); assert_eq!( *db.events.read(), vec![ @@ -1714,21 +1709,22 @@ mod tests { persist_age_threshold_seconds: NonZeroU32::new(20).unwrap(), ..Default::default() }; - let now = Instant::now(); + let now = Utc::now(); + let now_instant = Instant::now(); // This could occur if the in-memory contents of a partition are deleted, and // compaction causes the chunks to be removed. In such a scenario the persistence // windows will still think there are rows to be persisted let partitions = vec![TestPartition::new(vec![]).with_persistence( 10, - now - Duration::from_secs(20), + now - chrono::Duration::seconds(20), from_secs(20), )]; let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new(&db); - lifecycle.check_for_work(from_secs(0), now); + lifecycle.check_for_work(now, now_instant); assert_eq!(*db.events.read(), vec![MoverEvents::Persist(vec![]),]); } @@ -1742,7 +1738,8 @@ mod tests { max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()), ..Default::default() }; - let now = Instant::now(); + let now = Utc::now(); + let now_instant = Instant::now(); let partitions = vec![ // Sufficient rows => could persist but should be suppressed @@ -1758,13 +1755,13 @@ mod tests { let db = TestDb::from_partitions(rules, partitions); let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db); - lifecycle.check_for_work(from_secs(0), now); + lifecycle.check_for_work(now, now_instant); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),] ); - lifecycle.check_for_work(from_secs(0), now); + lifecycle.check_for_work(now, now_instant); assert_eq!( *db.events.read(), vec![MoverEvents::Compact(vec![ChunkId::new(2), ChunkId::new(3)]),] @@ -1772,7 +1769,7 @@ mod tests { lifecycle.unsuppress_persistence(); - lifecycle.check_for_work(from_secs(0), now); + lifecycle.check_for_work(now, now_instant); assert_eq!( *db.events.read(), vec![ diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 46a4c7d1bc..55e7428b73 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -4,10 +4,10 @@ use std::{ num::NonZeroUsize, ops::Deref, sync::Arc, - time::{Duration, Instant}, + time::Duration as StdDuration, }; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}; use entry::Sequence; @@ -15,9 +15,8 @@ use internal_types::freezable::{Freezable, FreezeHandle}; use crate::min_max_sequence::MinMaxSequence; use crate::{checkpoint::PartitionCheckpoint, min_max_sequence::OptionalMinMaxSequence}; -use data_types::instant::to_approximate_datetime; -const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); +const DEFAULT_CLOSED_WINDOW_SECONDS: i64 = 30; /// PersistenceWindows keep track of ingested data within a partition to determine when it /// can be persisted. This allows IOx to receive out of order writes (in their timestamps) while @@ -29,11 +28,11 @@ const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30); /// /// The `PersistenceWindows` operate on two different types of time /// -/// * row timestamps - these are `DateTime` and are the row's value for the `time` column -/// * Wall timestamps - these are `Instant` and are the Wall clock of the system used to determine +/// * row timestamps - these are the row's value for the `time` column +/// * Wall timestamps - these are the Wall clock of the system used to determine /// the "age" of a set of writes within a PersistenceWindow /// -/// To aid testing Wall timestamps are passed to many methods instead of directly using `Instant::now` +/// To aid testing Wall timestamps are passed to many methods instead of directly using `Utc::now` /// /// The PersistenceWindows answer the question: - "What is the maximum row timestamp in the writes /// that arrived more than late_arrival_period seconds ago, as determined by wall clock time" @@ -46,13 +45,14 @@ pub struct PersistenceWindows { addr: PartitionAddr, late_arrival_period: Duration, + closed_window_period: Duration, /// The instant this PersistenceWindows was created - created_at: Instant, + time_of_first_write: DateTime, - /// The last instant passed to PersistenceWindows::add_range - last_instant: Instant, + /// The maximum Wall timestamp that has been passed to PersistenceWindows::add_range + time_of_last_write: DateTime, /// maps sequencer_id to the maximum sequence passed to PersistenceWindows::add_range max_sequence_numbers: BTreeMap, @@ -78,7 +78,7 @@ pub struct FlushHandle { /// The address of the partition addr: PartitionAddr, - /// The timestamp to flush + /// The row timestamp to flush timestamp: DateTime, /// The sequence number ranges not including those persisted by this flush @@ -104,13 +104,13 @@ impl FlushHandle { } impl PersistenceWindows { - pub fn new(addr: PartitionAddr, late_arrival_period: Duration, now: Instant) -> Self { - let closed_window_period = late_arrival_period.min(DEFAULT_CLOSED_WINDOW_PERIOD); + pub fn new(addr: PartitionAddr, late_arrival_period: StdDuration, now: DateTime) -> Self { + let late_arrival_period = Duration::from_std(late_arrival_period).unwrap(); + let closed_window_period = + late_arrival_period.min(Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)); - let late_arrival_seconds = late_arrival_period.as_secs(); - let closed_window_seconds = closed_window_period.as_secs(); - - let closed_window_count = late_arrival_seconds / closed_window_seconds; + let closed_window_count = + late_arrival_period.num_seconds() / closed_window_period.num_seconds(); Self { persistable: Freezable::new(None), @@ -119,15 +119,17 @@ impl PersistenceWindows { addr, late_arrival_period, closed_window_period, - created_at: now, - last_instant: now, + time_of_first_write: now, + time_of_last_write: now, max_sequence_numbers: Default::default(), } } /// Updates the late arrival period of this `PersistenceWindows` instance - pub fn set_late_arrival_period(&mut self, late_arrival_period: Duration) { - self.closed_window_period = late_arrival_period.min(DEFAULT_CLOSED_WINDOW_PERIOD); + pub fn set_late_arrival_period(&mut self, late_arrival_period: StdDuration) { + let late_arrival_period = Duration::from_std(late_arrival_period).unwrap(); + self.closed_window_period = + late_arrival_period.min(Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)); self.late_arrival_period = late_arrival_period; } @@ -148,12 +150,10 @@ impl PersistenceWindows { } /// Updates the windows with the information from a batch of rows from a single sequencer - /// to the same partition. The min and max times are the times on the row data. The `received_at` - /// Instant is when the data was received. Taking it in this function is really just about - /// dependency injection for testing purposes. Otherwise, this function wouldn't take that - /// parameter and just use `Instant::now()`. + /// to the same partition. `min_time` and `max_time` are row timestamps in the written data + /// and `time_of_write` is the wall time that the write was performed. /// - /// The `received_at` is used by the lifecycle manager to determine how long the data in a + /// The `time_of_write` is used by the lifecycle manager to determine how long the data in a /// persistence window has been sitting in memory. If it is over the configured threshold /// the data should be persisted. /// @@ -161,7 +161,6 @@ impl PersistenceWindows { /// is triggered (either by crossing a row count threshold or time). /// /// # Panics - /// - When the passed `received_at` is smaller than the last time this method was used (aka time goes backwards). /// - When `min_time > max_time`. pub fn add_range( &mut self, @@ -169,21 +168,17 @@ impl PersistenceWindows { row_count: NonZeroUsize, min_time: DateTime, max_time: DateTime, - received_at: Instant, + time_of_write: DateTime, ) { - assert!( - received_at >= self.last_instant, - "PersistenceWindows::add_range called out of order, received_at ({:?}) < last_instant ({:?})", - received_at, - self.last_instant, - ); + // DateTime is not monotonic + let time_of_write = self.time_of_last_write.max(time_of_write); assert!( min_time <= max_time, "PersistenceWindows::add_range called with min_time ({}) > max_time ({})", min_time, max_time ); - self.last_instant = received_at; + self.time_of_last_write = time_of_write; if let Some(sequence) = sequence { match self.max_sequence_numbers.entry(sequence.id) { @@ -203,13 +198,13 @@ impl PersistenceWindows { } } - self.rotate(received_at); + self.rotate(time_of_write); match self.open.as_mut() { - Some(w) => w.add_range(sequence, row_count, min_time, max_time, received_at), + Some(w) => w.add_range(sequence, row_count, min_time, max_time, time_of_write), None => { self.open = Some(Window::new( - received_at, + time_of_write, sequence, row_count, min_time, @@ -224,7 +219,7 @@ impl PersistenceWindows { /// `now` is the Wall clock time of the server to use for determining how "old" a given /// persistence window is, or in other words, how long since the writes it contains the /// metrics for were written to this partition - fn rotate(&mut self, now: Instant) { + fn rotate(&mut self, now: DateTime) { let rotate = self .open .as_ref() @@ -306,10 +301,15 @@ impl PersistenceWindows { .collect() } + /// Acquire a handle that flushes all unpersisted data + pub fn flush_all_handle(&mut self) -> Option { + self.flush_handle(chrono::MAX_DATETIME) + } + /// Acquire a handle that prevents mutation of the persistable window until dropped /// /// Returns `None` if there is an outstanding handle or nothing to persist - pub fn flush_handle(&mut self, now: Instant) -> Option { + pub fn flush_handle(&mut self, now: DateTime) -> Option { // Verify no active flush handles before closing open window self.persistable.get_mut()?; @@ -385,8 +385,8 @@ impl PersistenceWindows { /// update the min row timestamp but not the row count pub fn summaries(&self) -> impl Iterator + '_ { self.windows().map(move |window| WriteSummary { - time_of_first_write: to_approximate_datetime(window.created_at), - time_of_last_write: to_approximate_datetime(window.last_instant), + time_of_first_write: window.time_of_first_write, + time_of_last_write: window.time_of_last_write, min_timestamp: window.min_time, max_timestamp: window.max_time, row_count: window.row_count.get(), @@ -404,8 +404,8 @@ impl PersistenceWindows { } /// Returns the minimum unpersisted age - pub fn minimum_unpersisted_age(&self) -> Option { - self.minimum_window().map(|x| x.created_at) + pub fn minimum_unpersisted_age(&self) -> Option> { + self.minimum_window().map(|x| x.time_of_first_write) } /// Returns the minimum unpersisted timestamp @@ -419,7 +419,7 @@ impl PersistenceWindows { } /// Returns the number of persistable rows - pub fn persistable_row_count(&self, now: Instant) -> usize { + pub fn persistable_row_count(&self, now: DateTime) -> usize { self.windows() .take_while(|window| window.is_persistable(now, self.late_arrival_period)) .map(|window| window.row_count.get()) @@ -431,9 +431,9 @@ impl PersistenceWindows { struct Window { /// The server time when this window was created. Used to determine how long data in this /// window has been sitting in memory. - created_at: Instant, + time_of_first_write: DateTime, /// The server time of the last write to this window - last_instant: Instant, + time_of_last_write: DateTime, /// The number of rows in the window row_count: NonZeroUsize, /// min time value for data in the window @@ -446,7 +446,7 @@ struct Window { impl Window { fn new( - created_at: Instant, + time_of_write: DateTime, sequence: Option<&Sequence>, row_count: NonZeroUsize, min_time: DateTime, @@ -461,8 +461,8 @@ impl Window { } Self { - created_at, - last_instant: created_at, + time_of_first_write: time_of_write, + time_of_last_write: time_of_write, row_count, min_time, max_time, @@ -478,10 +478,11 @@ impl Window { row_count: NonZeroUsize, min_time: DateTime, max_time: DateTime, - instant: Instant, + time_of_write: DateTime, ) { - assert!(self.created_at <= instant); - self.last_instant = instant; + assert!(self.time_of_first_write <= time_of_write); + assert!(self.time_of_last_write <= time_of_write); + self.time_of_last_write = time_of_write; self.row_count = NonZeroUsize::new(self.row_count.get() + row_count.get()).expect("both are > 0"); @@ -509,10 +510,10 @@ impl Window { /// Add one window to another. Used to collapse closed windows into persisted. fn add_window(&mut self, other: Self) { - assert!(self.last_instant <= other.created_at); - assert!(self.last_instant <= other.last_instant); + assert!(self.time_of_last_write <= other.time_of_first_write); + assert!(self.time_of_last_write <= other.time_of_last_write); - self.last_instant = other.last_instant; + self.time_of_last_write = other.time_of_last_write; self.row_count = NonZeroUsize::new(self.row_count.get() + other.row_count.get()).expect("both are > 0"); if self.min_time > other.min_time { @@ -535,17 +536,13 @@ impl Window { } /// If this window can be closed - fn is_closeable(&self, now: Instant, closed_window_period: Duration) -> bool { - now.checked_duration_since(self.created_at) - .map(|x| x >= closed_window_period) - .unwrap_or(false) + fn is_closeable(&self, now: DateTime, closed_window_period: Duration) -> bool { + (now - self.time_of_first_write) >= closed_window_period } /// If this window is persistable - fn is_persistable(&self, now: Instant, late_arrival_period: Duration) -> bool { - now.checked_duration_since(self.created_at) - .map(|x| x >= late_arrival_period) - .unwrap_or(false) + fn is_persistable(&self, now: DateTime, late_arrival_period: Duration) -> bool { + (now - self.time_of_first_write) >= late_arrival_period } } @@ -555,7 +552,7 @@ mod tests { use super::*; - fn make_windows(late_arrival_period: Duration) -> PersistenceWindows { + fn make_windows(late_arrival_period: StdDuration) -> PersistenceWindows { PersistenceWindows::new( PartitionAddr { db_name: Arc::from("db"), @@ -563,30 +560,21 @@ mod tests { partition_key: Arc::from("partition_key"), }, late_arrival_period, - Instant::now(), + Utc::now(), ) } - // *NOTE*: this test currently fails on (at least) aarch64 architectures - // such as an Apple M1 machine. - // - // Possibly related to https://github.com/rust-lang/rust/issues/87906 but - // not clear at this point. - // - // Ignoring the tests here to get the suite green on aarch64. - #[cfg(not(target_arch = "aarch64"))] #[test] - #[should_panic(expected = "PersistenceWindows::add_range called out of order")] - fn panics_when_time_goes_backwards() { - let mut w = make_windows(Duration::from_secs(60)); - let now = Instant::now(); + fn time_go_backwards() { + let mut w = make_windows(StdDuration::from_secs(60)); + let now = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 1 }), NonZeroUsize::new(1).unwrap(), Utc::now(), Utc::now(), - now + Duration::from_nanos(1), + now + Duration::nanoseconds(1), ); w.add_range( @@ -601,7 +589,7 @@ mod tests { #[test] #[should_panic(expected = "PersistenceWindows::add_range called with min_time")] fn panics_when_min_time_gt_max_time() { - let mut w = make_windows(Duration::from_secs(60)); + let mut w = make_windows(StdDuration::from_secs(60)); let t = Utc::now(); w.add_range( @@ -609,54 +597,61 @@ mod tests { NonZeroUsize::new(1).unwrap(), t + chrono::Duration::nanoseconds(1), t, - Instant::now(), + Utc::now(), ); } #[test] fn starts_open_window() { - let mut w = make_windows(Duration::from_secs(60)); + let mut w = make_windows(StdDuration::from_secs(60)); - let i = Instant::now(); - let start_time = Utc::now(); + let row_t0 = Utc::now(); + let row_t1 = row_t0 + chrono::Duration::seconds(1); + let row_t2 = row_t1 + chrono::Duration::milliseconds(3); + let row_t3 = row_t2 + chrono::Duration::milliseconds(3); + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + chrono::Duration::seconds(2); + let write_t2 = write_t1 + chrono::Duration::seconds(2); + let write_t3 = write_t2 + chrono::Duration::seconds(2); + + // Write timestamps are purposefully out of order w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(1).unwrap(), - start_time, - Utc::now(), - i, + row_t0, + row_t0, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 4 }), NonZeroUsize::new(2).unwrap(), - Utc::now(), - Utc::now(), - Instant::now(), + row_t1, + row_t1, + write_t2, ); w.add_range( Some(&Sequence { id: 1, number: 10 }), NonZeroUsize::new(1).unwrap(), - Utc::now(), - Utc::now(), - Instant::now(), + row_t2, + row_t3, + write_t3, ); - let time_before_last_time = Utc::now(); - let last_time = Utc::now(); w.add_range( Some(&Sequence { id: 2, number: 23 }), NonZeroUsize::new(10).unwrap(), - time_before_last_time, - last_time, - Instant::now(), + row_t2, + row_t3, + write_t1, ); assert!(w.closed.is_empty()); assert!(w.persistable.is_none()); let open = w.open.unwrap(); - assert_eq!(open.min_time, start_time); - assert_eq!(open.max_time, last_time); + assert_eq!(open.time_of_last_write, write_t3); + assert_eq!(open.min_time, row_t0); + assert_eq!(open.max_time, row_t3); assert_eq!(open.row_count.get(), 14); assert_eq!( open.sequencer_numbers.get(&1).unwrap(), @@ -670,52 +665,53 @@ mod tests { #[test] fn closes_open_window() { - let mut w = make_windows(Duration::from_secs(60)); - let created_at = Instant::now(); - let start_time = Utc::now(); - let last_time = Utc::now(); + let mut w = make_windows(StdDuration::from_secs(60)); + let created_at = w.time_of_first_write; + + let row_t0 = Utc.timestamp_nanos(39049493); + let row_t1 = row_t0 + Duration::seconds(3); + let row_t2 = row_t1 + Duration::milliseconds(65); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(1).unwrap(), - start_time, - start_time, + row_t0, + row_t1, created_at, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(1).unwrap(), - last_time, - last_time, - Instant::now(), + row_t0, + row_t1, + created_at, ); - let after_close_threshold = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); - let open_time = Utc::now(); + + let after_close_threshold = created_at + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); w.add_range( Some(&Sequence { id: 1, number: 6 }), NonZeroUsize::new(2).unwrap(), - last_time, - open_time, + row_t1, + row_t2, after_close_threshold, ); assert!(w.persistable.is_none()); + assert_eq!(w.closed.len(), 1); let closed = w.closed.get(0).unwrap(); assert_eq!( closed.sequencer_numbers.get(&1).unwrap(), &MinMaxSequence::new(2, 3) ); assert_eq!(closed.row_count.get(), 2); - assert_eq!(closed.min_time, start_time); - assert_eq!(closed.max_time, last_time); + assert_eq!(closed.min_time, row_t0); + assert_eq!(closed.max_time, row_t1); let open = w.open.unwrap(); assert_eq!(open.row_count.get(), 2); - assert_eq!(open.min_time, last_time); - assert_eq!(open.max_time, open_time); + assert_eq!(open.min_time, row_t1); + assert_eq!(open.max_time, row_t2); assert_eq!( open.sequencer_numbers.get(&1).unwrap(), &MinMaxSequence::new(6, 6) @@ -724,8 +720,8 @@ mod tests { #[test] fn moves_to_persistable() { - let mut w = make_windows(Duration::from_secs(120)); - let created_at = Instant::now(); + let mut w = make_windows(StdDuration::from_secs(120)); + let write_t0 = w.time_of_first_write; let start_time = Utc::now(); let first_end = Utc::now(); @@ -734,92 +730,84 @@ mod tests { NonZeroUsize::new(2).unwrap(), start_time, first_end, - created_at, + write_t0, ); - let second_created_at = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); let second_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(3).unwrap(), first_end, second_end, - second_created_at, + write_t1, ); - let third_created_at = second_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); + let write_2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); let third_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 4 }), NonZeroUsize::new(4).unwrap(), second_end, third_end, - third_created_at, + write_2, ); assert!(w.persistable.is_none()); // confirm the two on closed and third on open let c = w.closed.get(0).cloned().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); let c = w.closed.get(1).cloned().unwrap(); - assert_eq!(c.created_at, second_created_at); + assert_eq!(c.time_of_first_write, write_t1); assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, first_end); assert_eq!(c.max_time, second_end); let c = w.open.clone().unwrap(); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_2); assert_eq!(c.row_count.get(), 4); assert_eq!(c.min_time, second_end); assert_eq!(c.max_time, third_end); - let fourth_created_at = third_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) - .unwrap(); + let write_t3 = write_2 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 3); let fourth_end = Utc::now(); w.add_range( Some(&Sequence { id: 1, number: 5 }), NonZeroUsize::new(1).unwrap(), fourth_end, fourth_end, - fourth_created_at, + write_t3, ); // confirm persistable has first and second let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 5); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, second_end); // and the third window moved to closed let c = w.closed.get(0).cloned().unwrap(); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_2); assert_eq!(c.row_count.get(), 4); assert_eq!(c.min_time, second_end); assert_eq!(c.max_time, third_end); - let fifth_created_at = fourth_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 100) - .unwrap(); + let write_t4 = write_t3 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 100); w.add_range( Some(&Sequence { id: 1, number: 9 }), NonZeroUsize::new(2).unwrap(), Utc::now(), Utc::now(), - fifth_created_at, + write_t4, ); let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 10); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, fourth_end); @@ -827,56 +815,50 @@ mod tests { #[test] fn flush_persistable_keeps_open_and_closed() { - let mut w = make_windows(Duration::from_secs(120)); + let mut w = make_windows(StdDuration::from_secs(120)); // these instants represent when the server received the data. Here we have a window that // should be in the persistable group, a closed window, and an open window that is closed on flush. - let created_at = Instant::now(); - let second_created_at = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2) - .unwrap(); - let third_created_at = second_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); - let end_at = third_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 2); + let write_t2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t3 = write_t2 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); // these times represent the value of the time column for the rows of data. Here we have // non-overlapping windows. let start_time = Utc::now(); - let first_end = start_time + chrono::Duration::seconds(1); - let second_start = first_end + chrono::Duration::seconds(1); - let second_end = second_start + chrono::Duration::seconds(1); - let third_start = second_end + chrono::Duration::seconds(1); - let third_end = third_start + chrono::Duration::seconds(1); + let first_end = start_time + Duration::seconds(1); + let second_start = first_end + Duration::seconds(1); + let second_end = second_start + Duration::seconds(1); + let third_start = second_end + Duration::seconds(1); + let third_end = third_start + Duration::seconds(1); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), start_time, first_end, - created_at, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(3).unwrap(), second_start, second_end, - second_created_at, + write_t1, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), NonZeroUsize::new(2).unwrap(), third_start, third_end, - third_created_at, + write_t2, ); - w.rotate(end_at); + w.rotate(write_t3); let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -884,7 +866,7 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let handle = w.flush_handle(end_at).unwrap(); + let handle = w.flush_handle(write_t3).unwrap(); w.flush(handle); assert!(w.persistable.is_none()); @@ -895,77 +877,71 @@ mod tests { assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, second_start); assert_eq!(c.max_time, second_end); - assert_eq!(c.created_at, second_created_at); + assert_eq!(c.time_of_first_write, write_t1); let c = &w.closed[1]; assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, third_start); assert_eq!(c.max_time, third_end); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_t2); } #[test] fn flush_persistable_overlaps_closed() { - let mut w = make_windows(Duration::from_secs(120)); + let mut w = make_windows(StdDuration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. - let created_at = Instant::now(); - let second_created_at = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 2) - .unwrap(); - let third_created_at = second_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); - let end_at = third_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 2); + let write_t2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t3 = write_t2 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); // the times of the rows of data. this will create overlapping windows where persistable // overlaps with the oldest closed window. let start_time = Utc::now(); - let second_start = start_time + chrono::Duration::seconds(1); - let first_end = second_start + chrono::Duration::seconds(1); - let second_end = first_end + chrono::Duration::seconds(1); - let third_start = first_end + chrono::Duration::seconds(1); - let third_end = third_start + chrono::Duration::seconds(1); + let second_start = start_time + Duration::seconds(1); + let first_end = second_start + Duration::seconds(1); + let second_end = first_end + Duration::seconds(1); + let third_start = first_end + Duration::seconds(1); + let third_end = third_start + Duration::seconds(1); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), start_time, first_end, - created_at, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(3).unwrap(), second_start, second_end, - second_created_at, + write_t1, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), NonZeroUsize::new(2).unwrap(), third_start, third_end, - third_created_at, + write_t2, ); - w.rotate(end_at); + w.rotate(write_t3); let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let flush = w.flush_handle(end_at).unwrap(); + let flush = w.flush_handle(write_t3).unwrap(); assert_eq!(flush.timestamp(), first_end); - let truncated_time = flush.timestamp() + chrono::Duration::nanoseconds(1); + let truncated_time = flush.timestamp() + Duration::nanoseconds(1); w.flush(flush); assert!(w.persistable.is_none()); @@ -977,64 +953,60 @@ mod tests { assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, truncated_time); assert_eq!(c.max_time, second_end); - assert_eq!(c.created_at, second_created_at); + assert_eq!(c.time_of_first_write, write_t1); let c = &w.closed[1]; assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, third_start); assert_eq!(c.max_time, third_end); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_t2); } #[test] fn flush_persistable_overlaps_open() { - let mut w = make_windows(Duration::from_secs(120)); + let mut w = make_windows(StdDuration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. - let created_at = Instant::now(); - let second_created_at = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) - .unwrap(); - let third_created_at = second_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); - let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap(); + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 3); + let write_t2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t3 = write_t2 + Duration::seconds(1); // the times of the rows of data. this will create overlapping windows where persistable // overlaps with the newest open window (but not the closed one). let start_time = Utc::now(); - let third_start = start_time + chrono::Duration::seconds(1); - let first_end = third_start + chrono::Duration::seconds(1); - let second_end = first_end + chrono::Duration::seconds(1); - let third_end = second_end + chrono::Duration::seconds(1); + let third_start = start_time + Duration::seconds(1); + let first_end = third_start + Duration::seconds(1); + let second_end = first_end + Duration::seconds(1); + let third_end = second_end + Duration::seconds(1); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), start_time, first_end, - created_at, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(3).unwrap(), first_end, second_end, - second_created_at, + write_t1, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), NonZeroUsize::new(2).unwrap(), third_start, third_end, - third_created_at, + write_t2, ); - w.rotate(end_at); + w.rotate(write_t3); let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -1042,7 +1014,7 @@ mod tests { let mins = w.persistable.as_ref().unwrap().sequencer_numbers.clone(); assert_eq!(mins, w.minimum_unpersisted_sequence().unwrap()); - let flush = w.flush_handle(end_at).unwrap(); + let flush = w.flush_handle(write_t3).unwrap(); assert_eq!(flush.timestamp(), first_end); assert!(w.open.is_none()); let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1); @@ -1060,7 +1032,7 @@ mod tests { assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, second_end); - assert_eq!(c.created_at, second_created_at); + assert_eq!(c.time_of_first_write, write_t1); // the open window should have been closed as part of creating the flush // handle and then truncated by the flush timestamp @@ -1068,57 +1040,53 @@ mod tests { assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, third_end); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_t2); } #[test] fn flush_persistable_overlaps_open_and_closed() { - let mut w = make_windows(Duration::from_secs(120)); + let mut w = make_windows(StdDuration::from_secs(120)); // these instants represent when data is received by the server. Here we have a persistable // window followed by two closed windows. - let created_at = Instant::now(); - let second_created_at = created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD * 3) - .unwrap(); - let third_created_at = second_created_at - .checked_add(DEFAULT_CLOSED_WINDOW_PERIOD) - .unwrap(); - let end_at = third_created_at.checked_add(Duration::new(1, 0)).unwrap(); + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 3); + let write_t2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t3 = write_t2 + Duration::seconds(1); // the times of the rows of data. this will create overlapping windows where persistable // overlaps with the closed window and the open one. let start_time = Utc::now(); - let second_start = start_time + chrono::Duration::seconds(1); - let third_start = second_start + chrono::Duration::seconds(1); - let first_end = third_start + chrono::Duration::seconds(1); - let second_end = first_end + chrono::Duration::seconds(1); - let third_end = second_end + chrono::Duration::seconds(1); + let second_start = start_time + Duration::seconds(1); + let third_start = second_start + Duration::seconds(1); + let first_end = third_start + Duration::seconds(1); + let second_end = first_end + Duration::seconds(1); + let third_end = second_end + Duration::seconds(1); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), start_time, first_end, - created_at, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 3 }), NonZeroUsize::new(3).unwrap(), second_start, second_end, - second_created_at, + write_t1, ); w.add_range( Some(&Sequence { id: 1, number: 5 }), NonZeroUsize::new(2).unwrap(), third_start, third_end, - third_created_at, + write_t2, ); let c = w.persistable.as_ref().unwrap(); - assert_eq!(c.created_at, created_at); + assert_eq!(c.time_of_first_write, write_t0); assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, start_time); assert_eq!(c.max_time, first_end); @@ -1129,10 +1097,10 @@ mod tests { // this should rotate the first window into persistable // after flush we should see no more persistable window and the closed windows // should have min timestamps equal to the previous flush end. - let flush = w.flush_handle(end_at).unwrap(); + let flush = w.flush_handle(write_t3).unwrap(); assert_eq!(flush.timestamp(), first_end); assert!(w.open.is_none()); - let flushed_time = flush.timestamp() + chrono::Duration::nanoseconds(1); + let flushed_time = flush.timestamp() + Duration::nanoseconds(1); w.flush(flush); assert!(w.persistable.is_none()); let mins = w.closed[0].sequencer_numbers.clone(); @@ -1145,7 +1113,7 @@ mod tests { assert_eq!(c.row_count.get(), 3); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, second_end); - assert_eq!(c.created_at, second_created_at); + assert_eq!(c.time_of_first_write, write_t1); // the open window should have been closed as part of creating the flush // handle and then truncated by the flush timestamp @@ -1153,61 +1121,65 @@ mod tests { assert_eq!(c.row_count.get(), 2); assert_eq!(c.min_time, flushed_time); assert_eq!(c.max_time, third_end); - assert_eq!(c.created_at, third_created_at); + assert_eq!(c.time_of_first_write, write_t2); } #[test] fn test_flush_guard() { - let mut w = make_windows(Duration::from_secs(120)); + let mut w = make_windows(StdDuration::from_secs(120)); + let late_arrival_period = w.late_arrival_period; - let instant = Instant::now(); - let start = Utc::now(); + // Space writes so each goes to a separate window + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + late_arrival_period; + let write_t2 = write_t1 + late_arrival_period * 2; + + let row_t0 = Utc::now(); + let row_t1 = row_t0 + Duration::seconds(2); + let row_t2 = row_t1 + Duration::seconds(2); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant, + row_t0, + row_t1, + write_t0, ); - w.rotate(instant + Duration::from_secs(120)); + w.rotate(write_t0 + late_arrival_period); assert!(w.persistable.is_some()); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); - assert_eq!( - w.persistable.as_ref().unwrap().max_time, - start + chrono::Duration::seconds(2) - ); + assert_eq!(w.persistable.as_ref().unwrap().max_time, row_t1); w.add_range( Some(&Sequence { id: 1, number: 4 }), NonZeroUsize::new(5).unwrap(), - start, - start + chrono::Duration::seconds(4), - instant + Duration::from_secs(120), + row_t0, + row_t2, + write_t1, ); // Should rotate into closed - w.rotate(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD); + w.rotate(write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)); assert_eq!(w.closed.len(), 1); let guard = w - .flush_handle(instant + Duration::from_secs(120) + DEFAULT_CLOSED_WINDOW_PERIOD) + .flush_handle(write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)) .unwrap(); // Should only allow one at once - assert!(w.flush_handle(instant).is_none()); + assert!(w.flush_handle(write_t0).is_none()); // This should not rotate into persistable as active flush guard - w.rotate(instant + Duration::from_secs(240)); + w.rotate(write_t1 + late_arrival_period); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); let flush_t = guard.timestamp(); - assert_eq!(flush_t, start + chrono::Duration::seconds(2)); + assert_eq!(flush_t, row_t1); // Min time should have been truncated by persist operation to be // 1 nanosecond more than was persisted - let truncated_time = flush_t + chrono::Duration::nanoseconds(1); + let truncated_time = flush_t + Duration::nanoseconds(1); // The flush checkpoint should not include the writes being persisted let flush_checkpoint = guard.checkpoint(); @@ -1237,12 +1209,12 @@ mod tests { ); // This should rotate into persistable - w.rotate(instant + Duration::from_secs(240)); + w.rotate(write_t1 + late_arrival_period); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5); assert_eq!(w.persistable.as_ref().unwrap().min_time, truncated_time); - let guard = w.flush_handle(instant + Duration::from_secs(240)).unwrap(); + let guard = w.flush_handle(write_t1 + late_arrival_period).unwrap(); // that checkpoint has an optional minimum let flush_checkpoint = guard.checkpoint(); @@ -1254,63 +1226,72 @@ mod tests { w.add_range( Some(&Sequence { id: 1, number: 9 }), NonZeroUsize::new(9).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant + Duration::from_secs(240), + row_t0, + row_t0 + chrono::Duration::seconds(2), + write_t2, ); // Should rotate into closed - w.rotate(instant + Duration::from_secs(240) + DEFAULT_CLOSED_WINDOW_PERIOD); + w.rotate(write_t2 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)); assert_eq!(w.closed.len(), 1); // This should not rotate into persistable as active flush guard - w.rotate(instant + Duration::from_secs(360)); + w.rotate(write_t2 + late_arrival_period); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5); std::mem::drop(guard); // This should rotate into persistable - w.rotate(instant + Duration::from_secs(360)); + w.rotate(write_t2 + late_arrival_period); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 5 + 9); - assert_eq!(w.persistable.as_ref().unwrap().min_time, start); + assert_eq!(w.persistable.as_ref().unwrap().min_time, row_t0); } #[test] fn test_flush_guard_multiple_closed() { - let mut w = make_windows(DEFAULT_CLOSED_WINDOW_PERIOD * 3); + let late_arrival_period = Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 3); + let mut w = make_windows(late_arrival_period.to_std().unwrap()); - let instant = Instant::now(); - let start = Utc::now(); + // Space writes so each goes to a separate window + let write_t0 = w.time_of_first_write; + let write_t1 = write_t0 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t2 = write_t1 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t3 = write_t2 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + let write_t4 = write_t3 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS); + + let row_t0 = Utc::now(); + let row_t1 = row_t0 + Duration::seconds(2); + let row_t2 = row_t1 + Duration::seconds(2); w.add_range( Some(&Sequence { id: 1, number: 2 }), NonZeroUsize::new(2).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant, + row_t0, + row_t1, + write_t0, ); w.add_range( Some(&Sequence { id: 1, number: 6 }), NonZeroUsize::new(5).unwrap(), - start, - start + chrono::Duration::seconds(4), - instant + DEFAULT_CLOSED_WINDOW_PERIOD, + row_t0, + row_t2, + write_t1, ); w.add_range( Some(&Sequence { id: 1, number: 9 }), NonZeroUsize::new(9).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2, + row_t0, + row_t1, + write_t2, ); w.add_range( Some(&Sequence { id: 1, number: 10 }), NonZeroUsize::new(17).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3, + row_t0, + row_t1, + write_t3, ); assert_eq!(w.closed.len(), 2); @@ -1318,28 +1299,26 @@ mod tests { assert_eq!(w.closed[1].row_count.get(), 9); assert_eq!(w.open.as_ref().unwrap().row_count.get(), 17); - let flush = w - .flush_handle(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3) - .unwrap(); + let flush = w.flush_handle(write_t0 + late_arrival_period).unwrap(); let flush_t = flush.timestamp(); assert!(w.open.is_none()); assert_eq!(flush.closed_count, 3); - assert_eq!(flush_t, start + chrono::Duration::seconds(2)); - let truncated_time = flush_t + chrono::Duration::nanoseconds(1); + assert_eq!(flush_t, row_t1); + let truncated_time = flush_t + Duration::nanoseconds(1); assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); w.add_range( Some(&Sequence { id: 1, number: 14 }), NonZeroUsize::new(11).unwrap(), - start, - start + chrono::Duration::seconds(2), - instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4, + row_t0, + row_t1, + write_t4, ); - w.rotate(instant + DEFAULT_CLOSED_WINDOW_PERIOD * 5); + w.rotate(write_t4 + Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS)); // Despite time passing persistable window shouldn't have changed due to flush guard assert_eq!(w.persistable.as_ref().unwrap().row_count.get(), 2); @@ -1369,48 +1348,30 @@ mod tests { &OptionalMinMaxSequence::new(Some(6), 14) ); - // Windows from writes at - // - // - `instant + DEFAULT_CLOSED_WINDOW_PERIOD * 2` - // - `instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3` - // - // have been completely persisted by the flush + // Windows from writes 2 and 3 have been completely persisted by the flush assert!(w.persistable.is_none()); assert_eq!(w.closed.len(), 2); - assert_eq!( - w.closed[0].created_at, - instant + DEFAULT_CLOSED_WINDOW_PERIOD - ); + assert_eq!(w.closed[0].time_of_first_write, write_t1); + assert_eq!(w.closed[0].time_of_last_write, write_t1); assert_eq!(w.closed[0].min_time, truncated_time); - assert_eq!(w.closed[0].max_time, start + chrono::Duration::seconds(4)); + assert_eq!(w.closed[0].max_time, row_t2); assert_eq!(w.closed[0].row_count.get(), 5); // Window created after flush handle - should be left alone - assert_eq!( - w.closed[1].created_at, - instant + DEFAULT_CLOSED_WINDOW_PERIOD * 4 - ); - assert_eq!(w.closed[1].min_time, start); - assert_eq!(w.closed[1].max_time, start + chrono::Duration::seconds(2)); + assert_eq!(w.closed[1].time_of_first_write, write_t4); + assert_eq!(w.closed[1].time_of_last_write, write_t4); + assert_eq!(w.closed[1].min_time, row_t0); + assert_eq!(w.closed[1].max_time, row_t1); assert_eq!(w.closed[1].row_count.get(), 11); } - // *NOTE*: this test currently fails on (at least) aarch64 architectures - // such as an Apple M1 machine. - // - // Possibly related to https://github.com/rust-lang/rust/issues/87906 but - // not clear at this point. - // - // Ignoring the tests here to get the suite green on aarch64. - #[cfg(not(target_arch = "aarch64"))] #[test] fn test_summaries() { - let late_arrival_period = Duration::from_secs(100); - let mut w = make_windows(late_arrival_period); - let instant = w.created_at; - let created_at_time = to_approximate_datetime(w.created_at); + let late_arrival_period = Duration::seconds(100); + let mut w = make_windows(late_arrival_period.to_std().unwrap()); + let start = w.time_of_first_write; // Window 1 w.add_range( @@ -1418,7 +1379,7 @@ mod tests { NonZeroUsize::new(11).unwrap(), Utc.timestamp_nanos(10), Utc.timestamp_nanos(11), - instant + Duration::from_millis(1), + start + Duration::milliseconds(1), ); w.add_range( @@ -1426,7 +1387,7 @@ mod tests { NonZeroUsize::new(4).unwrap(), Utc.timestamp_nanos(10), Utc.timestamp_nanos(340), - instant + Duration::from_millis(30), + start + Duration::milliseconds(30), ); w.add_range( @@ -1434,7 +1395,7 @@ mod tests { NonZeroUsize::new(6).unwrap(), Utc.timestamp_nanos(1), Utc.timestamp_nanos(5), - instant + Duration::from_millis(50), + start + Duration::milliseconds(50), ); // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 1 => Window 2 @@ -1443,7 +1404,7 @@ mod tests { NonZeroUsize::new(3).unwrap(), Utc.timestamp_nanos(89), Utc.timestamp_nanos(90), - instant + DEFAULT_CLOSED_WINDOW_PERIOD + Duration::from_millis(1), + start + w.closed_window_period + Duration::milliseconds(1), ); // More than DEFAULT_CLOSED_WINDOW_PERIOD after start of Window 2 => Window 3 @@ -1452,11 +1413,9 @@ mod tests { NonZeroUsize::new(8).unwrap(), Utc.timestamp_nanos(3), Utc.timestamp_nanos(4), - instant + DEFAULT_CLOSED_WINDOW_PERIOD * 3, + start + w.closed_window_period * 3, ); - let closed_duration = chrono::Duration::from_std(DEFAULT_CLOSED_WINDOW_PERIOD).unwrap(); - let summaries: Vec<_> = w.summaries().collect(); assert_eq!(summaries.len(), 3); @@ -1464,26 +1423,22 @@ mod tests { summaries, vec![ WriteSummary { - time_of_first_write: created_at_time + chrono::Duration::milliseconds(1), - time_of_last_write: created_at_time + chrono::Duration::milliseconds(50), + time_of_first_write: start + Duration::milliseconds(1), + time_of_last_write: start + Duration::milliseconds(50), min_timestamp: Utc.timestamp_nanos(1), max_timestamp: Utc.timestamp_nanos(340), row_count: 21 }, WriteSummary { - time_of_first_write: created_at_time - + closed_duration - + chrono::Duration::milliseconds(1), - time_of_last_write: created_at_time - + closed_duration - + chrono::Duration::milliseconds(1), + time_of_first_write: start + w.closed_window_period + Duration::milliseconds(1), + time_of_last_write: start + w.closed_window_period + Duration::milliseconds(1), min_timestamp: Utc.timestamp_nanos(89), max_timestamp: Utc.timestamp_nanos(90), row_count: 3 }, WriteSummary { - time_of_first_write: created_at_time + closed_duration * 3, - time_of_last_write: created_at_time + closed_duration * 3, + time_of_first_write: start + w.closed_window_period * 3, + time_of_last_write: start + w.closed_window_period * 3, min_timestamp: Utc.timestamp_nanos(3), max_timestamp: Utc.timestamp_nanos(4), row_count: 8 @@ -1492,7 +1447,7 @@ mod tests { ); // Rotate first and second windows into persistable - w.rotate(instant + late_arrival_period + DEFAULT_CLOSED_WINDOW_PERIOD * 2); + w.rotate(start + late_arrival_period + w.closed_window_period * 2); let summaries: Vec<_> = w.summaries().collect(); @@ -1501,17 +1456,15 @@ mod tests { summaries, vec![ WriteSummary { - time_of_first_write: created_at_time + chrono::Duration::milliseconds(1), - time_of_last_write: created_at_time - + closed_duration - + chrono::Duration::milliseconds(1), + time_of_first_write: start + Duration::milliseconds(1), + time_of_last_write: start + w.closed_window_period + Duration::milliseconds(1), min_timestamp: Utc.timestamp_nanos(1), max_timestamp: Utc.timestamp_nanos(340), row_count: 24 }, WriteSummary { - time_of_first_write: created_at_time + closed_duration * 3, - time_of_last_write: created_at_time + closed_duration * 3, + time_of_first_write: start + w.closed_window_period * 3, + time_of_last_write: start + w.closed_window_period * 3, min_timestamp: Utc.timestamp_nanos(3), max_timestamp: Utc.timestamp_nanos(4), row_count: 8 @@ -1522,9 +1475,9 @@ mod tests { #[test] fn test_regression_2206() { - let late_arrival_period = DEFAULT_CLOSED_WINDOW_PERIOD * 10; - let mut w = make_windows(late_arrival_period); - let mut now = w.created_at; + let late_arrival_period = Duration::seconds(DEFAULT_CLOSED_WINDOW_SECONDS * 10); + let mut w = make_windows(late_arrival_period.to_std().unwrap()); + let now = w.time_of_first_write; // window 1: to be persisted let min_time = Utc.timestamp_nanos(10); @@ -1538,7 +1491,7 @@ mod tests { ); // window 2: closed but overlaps with the persistence range - now += late_arrival_period; + let now = now + late_arrival_period; w.add_range( Some(&Sequence { id: 1, number: 4 }), NonZeroUsize::new(1).unwrap(), @@ -1559,7 +1512,7 @@ mod tests { #[test] fn test_mark_seen_and_persisted() { - let late_arrival_period = Duration::from_secs(100); + let late_arrival_period = StdDuration::from_secs(100); let mut w = make_windows(late_arrival_period); let mut sequencer_numbers1 = BTreeMap::new(); diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 5de4e84a9f..ede2d4f83d 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -9,6 +9,7 @@ description = "Tests of the query engine against different database configuratio [dependencies] async-trait = "0.1" +chrono = "0.4" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } once_cell = { version = "1.4.0", features = ["parking_lot"] } diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 9455bdc1d8..519532dcf9 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -4,7 +4,6 @@ pub mod delete; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; use once_cell::sync::OnceCell; @@ -162,11 +161,7 @@ impl DbSetup for NoData { // Now write the data in RB to object store but keep it in RB let chunk_id = db - .persist_partition( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition("cpu", partition_key, true) .await .unwrap() .unwrap() @@ -610,13 +605,9 @@ impl DbSetup for TwoMeasurementsManyFieldsLifecycle { db.compact_open_chunk("h2o", partition_key).await.unwrap(); - db.persist_partition( - "h2o", - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition("h2o", partition_key, true) + .await + .unwrap(); write_lp( &db, @@ -801,13 +792,9 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> .await .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); } let scenario4 = DbScenario { scenario_name: "Data in both read buffer and object store".into(), @@ -823,11 +810,7 @@ pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> .unwrap(); let id = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap() @@ -914,23 +897,15 @@ pub async fn make_two_chunk_scenarios( let db = make_db().await.db; let table_names = write_lp(&db, data1).await; for table_name in &table_names { - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); } let table_names = write_lp(&db, data2).await; for table_name in &table_names { - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); } let scenario5 = DbScenario { scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(), @@ -942,11 +917,7 @@ pub async fn make_two_chunk_scenarios( let table_names = write_lp(&db, data1).await; for table_name in &table_names { let id = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap() @@ -957,11 +928,7 @@ pub async fn make_two_chunk_scenarios( let table_names = write_lp(&db, data2).await; for table_name in &table_names { let id = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap() @@ -1003,13 +970,9 @@ pub async fn make_two_chunk_scenarios( /// Rollover the mutable buffer and load chunk 0 to the read buffer and object store pub async fn rollover_and_load(db: &Arc, partition_key: &str, table_name: &str) { - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); } // This function loads one chunk of lp data into RUB for testing predicate pushdown @@ -1035,11 +998,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( let table_names = write_lp(&db, data).await; for table_name in &table_names { let id = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap() @@ -1126,11 +1085,8 @@ impl DbSetup for ChunkOrder { let partition = partition.read(); let chunks = LockablePartition::chunks(&partition); let mut partition = partition.upgrade(); - let flush_handle = LockablePartition::prepare_persist( - &mut partition, - Instant::now() + Duration::from_secs(1), - ) - .unwrap(); + let flush_handle = + LockablePartition::prepare_persist(&mut partition, chrono::MAX_DATETIME).unwrap(); (chunks, flush_handle) }; diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index 46dbd396a9..2d38f6ecb9 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -9,7 +9,6 @@ use async_trait::async_trait; use query::QueryChunk; use std::fmt::Display; use std::sync::Arc; -use std::time::{Duration, Instant}; use server::db::test_helpers::write_lp; use server::utils::make_db; @@ -652,11 +651,7 @@ async fn make_chunk_with_deletes_at_different_stages( match chunk_stage { ChunkStage::RubOs | ChunkStage::Os => { let chunk_result = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap(); @@ -778,11 +773,7 @@ async fn make_different_stage_chunks_with_deletes_scenario( match chunk_data.chunk_stage { ChunkStage::RubOs | ChunkStage::Os => { let chunk = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap(); diff --git a/server/src/database.rs b/server/src/database.rs index fee9f40c15..aa69b9c4ad 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1433,13 +1433,9 @@ mod tests { assert!(db.partition_summary("table_1", "partition_by_a").is_some()); // persist one partition - db.persist_partition( - "table_1", - "partition_by_b", - Instant::now() + Duration::from_secs(2), - ) - .await - .unwrap(); + db.persist_partition("table_1", "partition_by_b", true) + .await + .unwrap(); // shutdown first database database.shutdown(); diff --git a/server/src/db.rs b/server/src/db.rs index e73b5c14fe..5a1d23c4ca 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -300,8 +300,10 @@ pub struct Db { /// - to keep the lifecycle state (e.g. the number of running compactions) around lifecycle_policy: tokio::sync::Mutex>>, - /// TESTING ONLY: Mocked `Instant::now()` for the background worker - background_worker_now_override: Mutex>, + /// TESTING ONLY: Mocked `Utc::now()` for the background worker + /// + /// TODO: Replace with TimeProvider (#2722) + now_override: Mutex>>, /// To-be-written delete predicates. delete_predicates_mailbox: Mutex, Vec)>>, @@ -357,7 +359,7 @@ impl Db { write_buffer_producer: database_to_commit.write_buffer_producer, cleanup_lock: Default::default(), lifecycle_policy: tokio::sync::Mutex::new(None), - background_worker_now_override: Default::default(), + now_override: Default::default(), delete_predicates_mailbox: Default::default(), }; let this = Arc::new(this); @@ -665,31 +667,17 @@ impl Db { /// Persist given partition. /// + /// If `force` is `true` will persist all unpersisted data regardless of arrival time + /// /// Errors if there is nothing to persist at the moment as per the lifecycle rules. If successful it returns the /// chunk that contains the persisted data. /// - /// The `now` timestamp should normally be `Instant::now()` but can be altered for testing. pub async fn persist_partition( self: &Arc, table_name: &str, partition_key: &str, - now: Instant, + force: bool, ) -> Result>> { - self.persist_partition_with_timestamp(table_name, partition_key, now, Utc::now) - .await - } - - /// Internal use only for testing. - async fn persist_partition_with_timestamp( - self: &Arc, - table_name: &str, - partition_key: &str, - now: Instant, - f_parquet_creation_timestamp: F, - ) -> Result>> - where - F: Fn() -> DateTime + Send, - { // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` let fut = { @@ -702,7 +690,10 @@ impl Db { // get flush handle let flush_handle = partition .persistence_windows_mut() - .map(|window| window.flush_handle(now)) + .map(|window| match force { + true => window.flush_all_handle(), + false => window.flush_handle(self.utc_now()), + }) .flatten() .context(CannotFlushPartition { table_name, @@ -732,13 +723,8 @@ impl Db { }) .collect::, _>>()?; - let (_, fut) = lifecycle::persist_chunks( - partition, - chunks, - flush_handle, - f_parquet_creation_timestamp, - ) - .context(LifecycleError)?; + let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle) + .context(LifecycleError)?; fut }; @@ -845,9 +831,7 @@ impl Db { .as_mut() .expect("lifecycle policy should be initialized"); - policy - .check_for_work(Utc::now(), self.background_worker_now()) - .await + policy.check_for_work(self.utc_now(), Instant::now()).await } }; @@ -937,11 +921,11 @@ impl Db { info!("finished db background worker"); } - /// `Instant::now()` that is used by the background worker. Can be mocked for testing. - fn background_worker_now(&self) -> Instant { - self.background_worker_now_override - .lock() - .unwrap_or_else(Instant::now) + /// `Utc::now()` that is used by `Db`. Can be mocked for testing. + /// + /// TODO: Remove (#2722) + fn utc_now(&self) -> DateTime { + self.now_override.lock().unwrap_or_else(Utc::now) } async fn cleanup_unreferenced_parquet_files( @@ -1233,21 +1217,21 @@ impl Db { row_count, min_time, max_time, - self.background_worker_now(), + self.utc_now(), ); } None => { let mut windows = PersistenceWindows::new( partition.addr().clone(), late_arrival_window, - self.background_worker_now(), + self.utc_now(), ); windows.add_range( sequence, row_count, min_time, max_time, - self.background_worker_now(), + self.utc_now(), ); partition.set_persistence_windows(windows); } @@ -1465,6 +1449,7 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; + use chrono::{DateTime, TimeZone}; use futures::{stream, StreamExt, TryStreamExt}; use predicate::delete_expr::DeleteExpr; use tokio_util::sync::CancellationToken; @@ -1808,13 +1793,9 @@ mod tests { catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); let t6_write = t5_write + chrono::Duration::seconds(1); + *db.now_override.lock() = Some(t6_write); let chunk_id = db - .persist_partition_with_timestamp( - "cpu", - "1970-01-01T00", - Instant::now() + Duration::from_secs(1), - || t6_write, - ) + .persist_partition("cpu", "1970-01-01T00", true) .await .unwrap() .unwrap() @@ -2244,13 +2225,9 @@ mod tests { .unwrap(); // Write the RB chunk to Object Store but keep it in RB let t3_persist = t2_write + chrono::Duration::seconds(1); + *db.now_override.lock() = Some(t3_persist); let pq_chunk = db - .persist_partition_with_timestamp( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - || t3_persist, - ) + .persist_partition("cpu", partition_key, true) .await .unwrap() .unwrap(); @@ -2346,13 +2323,9 @@ mod tests { .unwrap(); // Write the RB chunk to Object Store but keep it in RB let t3_persist = t2_write + chrono::Duration::seconds(1); + *db.now_override.lock() = Some(t3_persist); let pq_chunk = db - .persist_partition_with_timestamp( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - || t3_persist, - ) + .persist_partition("cpu", partition_key, true) .await .unwrap() .unwrap(); @@ -2800,14 +2773,11 @@ mod tests { // Persist rb to parquet os let t4_persist = t3_write + chrono::Duration::seconds(1); - db.persist_partition_with_timestamp( - "cpu", - "1970-01-01T00", - Instant::now() + Duration::from_secs(1), - || t4_persist, - ) - .await - .unwrap(); + *db.now_override.lock() = Some(t4_persist); + db.persist_partition("cpu", "1970-01-01T00", true) + .await + .unwrap() + .unwrap(); // Check first/last write times on the chunks at this point let mut chunk_summaries = db.chunk_summaries().expect("expected summary to return"); @@ -2950,13 +2920,9 @@ mod tests { db.compact_partition("cpu", "1970-01-01T00").await.unwrap(); // write the read buffer chunk to object store - db.persist_partition( - "cpu", - "1970-01-01T00", - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition("cpu", "1970-01-01T00", true) + .await + .unwrap(); // write into a separate partition write_lp(&db, "cpu bar=1 400000000000000").await; @@ -3136,13 +3102,9 @@ mod tests { assert_ne!(mb_chunk.id(), rb_chunk.id()); // RB => OS - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); // we should have chunks in both the read buffer only assert!(mutable_chunk_ids(&db, partition_key).is_empty()); @@ -3694,13 +3656,9 @@ mod tests { let partition_key = "part_a"; write_lp(&db, "cpu,part=a row=10,selector=0i 10").await; write_lp(&db, "cpu,part=a row=11,selector=1i 11").await; - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); // 2: RUB let partition_key = "part_b"; @@ -3719,12 +3677,9 @@ mod tests { let partition_key = "part_d"; write_lp(&db, "cpu,part=d row=40,selector=0i 40").await; write_lp(&db, "cpu,part=d row=41,selector=1i 41").await; + let chunk_id = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap() @@ -3749,13 +3704,9 @@ mod tests { // ==================== do: preserve another partition ==================== let partition_key = "part_b"; - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition(table_name, partition_key, true) + .await + .unwrap(); // ==================== do: use background worker for a short while ==================== let iters_start = db.worker_iterations_delete_predicate_preservation(); @@ -4041,13 +3992,9 @@ mod tests { )); // once persisted drop should work - db.persist_partition( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition("cpu", partition_key, true) + .await + .unwrap(); db.drop_partition("cpu", partition_key).await.unwrap(); // no chunks left @@ -4071,13 +4018,9 @@ mod tests { write_lp(db.as_ref(), "cpu bar=2 20").await; let partition_key = "1970-01-01T00"; - db.persist_partition( - "cpu", - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); + db.persist_partition("cpu", partition_key, true) + .await + .unwrap(); // query data before drop let expected = vec![ @@ -4117,11 +4060,7 @@ mod tests { // Write the RB chunk to Object Store but keep it in RB let chunk = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap(); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index eb042e37f3..f4700fb4b0 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -568,7 +568,6 @@ mod tests { utils::make_db, }; use data_types::chunk_metadata::ChunkStorage; - use std::time::{Duration, Instant}; async fn test_chunk_access(chunk: &CatalogChunk) { let t1 = chunk.access_recorder().get_metrics(); @@ -661,11 +660,7 @@ mod tests { write_lp_with_time(&db, "cpu,tag=1 bar=1 1", creation_time).await; let id = db - .persist_partition( - "cpu", - "1970-01-01T00", - Instant::now() + Duration::from_secs(10000), - ) + .persist_partition("cpu", "1970-01-01T00", true) .await .unwrap() .unwrap() @@ -690,19 +685,14 @@ mod tests { async fn parquet_snapshot() { let db = make_db().await.db; - let before_creation = Utc::now(); - write_lp(&db, "cpu,tag=1 bar=1 1").await; - let after_creation = Utc::now(); - write_lp(&db, "cpu,tag=2 bar=2 2").await; - let after_write = Utc::now(); + let w0 = Utc::now(); + write_lp_with_time(&db, "cpu,tag=1 bar=1 1", w0).await; + let w1 = w0 + chrono::Duration::seconds(4); + write_lp_with_time(&db, "cpu,tag=2 bar=2 2", w1).await; - db.persist_partition( - "cpu", - "1970-01-01T00", - Instant::now() + Duration::from_secs(10000), - ) - .await - .unwrap(); + db.persist_partition("cpu", "1970-01-01T00", true) + .await + .unwrap(); let chunks = db.catalog.chunks(); assert_eq!(chunks.len(), 1); @@ -713,9 +703,7 @@ mod tests { let first_write = snapshot.time_of_first_write(); let last_write = snapshot.time_of_last_write(); - assert!(before_creation < first_write); - assert!(first_write < after_creation); - assert!(first_write < last_write); - assert!(last_write < after_write); + assert_eq!(w0, first_write); + assert_eq!(w1, last_write); } } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 0731bfac3f..f652f458c8 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -29,7 +29,6 @@ use std::{ convert::TryInto, fmt::Display, sync::{Arc, Weak}, - time::Instant, }; use tracker::{RwLock, TaskTracker}; @@ -203,7 +202,7 @@ impl LockablePartition for LockableCatalogPartition { fn prepare_persist( partition: &mut LifecycleWriteGuard<'_, Self::Partition, Self>, - now: Instant, + now: DateTime, ) -> Option { let window = partition.persistence_windows_mut().unwrap(); let handle = window.flush_handle(now); @@ -217,7 +216,7 @@ impl LockablePartition for LockableCatalogPartition { handle: Self::PersistHandle, ) -> Result, Self::Error> { info!(table=%partition.table_name(), partition=%partition.partition_key(), "persisting chunks"); - let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0, Utc::now)?; + let (tracker, fut) = persist::persist_chunks(partition, chunks, handle.0)?; let _ = tokio::spawn(async move { fut.await.log_if_error("persisting chunks") }); Ok(tracker) } @@ -288,13 +287,13 @@ impl LifecyclePartition for Partition { .unwrap_or(true) } - fn persistable_row_count(&self, now: Instant) -> usize { + fn persistable_row_count(&self, now: DateTime) -> usize { self.persistence_windows() .map(|w| w.persistable_row_count(now)) .unwrap_or(0) } - fn minimum_unpersisted_age(&self) -> Option { + fn minimum_unpersisted_age(&self) -> Option> { self.persistence_windows() .and_then(|w| w.minimum_unpersisted_age()) } diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 333f37f9a1..f55e6e253c 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -18,19 +18,18 @@ use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; /// Split and then persist the provided chunks /// +/// `flush_handle` describes both what to persist and also acts as a transaction +/// on the persistence windows +/// /// TODO: Replace low-level locks with transaction object -pub fn persist_chunks( +pub fn persist_chunks( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, chunks: Vec>, flush_handle: FlushHandle, - f_parquet_creation_timestamp: F, ) -> Result<( TaskTracker, TrackedFuture>>> + Send>, -)> -where - F: Fn() -> DateTime + Send, -{ +)> { let now = std::time::Instant::now(); // time persist duration. let db = Arc::clone(&partition.data().db); let addr = partition.addr().clone(); @@ -188,13 +187,7 @@ where }; let to_persist = to_persist.write(); - write_chunk_to_object_store( - partition_write, - to_persist, - flush_handle, - f_parquet_creation_timestamp, - )? - .1 + write_chunk_to_object_store(partition_write, to_persist, flush_handle)?.1 }; // Wait for write operation to complete @@ -227,7 +220,7 @@ mod tests { use query::QueryDatabase; use std::{ num::{NonZeroU32, NonZeroU64}, - time::{Duration, Instant}, + time::Duration, }; async fn test_db() -> Arc { @@ -266,14 +259,14 @@ mod tests { let mut partition = partition.upgrade(); - let handle = LockablePartition::prepare_persist(&mut partition, Instant::now()) + let handle = LockablePartition::prepare_persist(&mut partition, Utc::now()) .unwrap() .0; assert_eq!(handle.timestamp(), Utc.timestamp_nanos(10)); let chunks: Vec<_> = chunks.map(|x| x.upgrade()).collect(); - persist_chunks(partition, chunks, handle, Utc::now) + persist_chunks(partition, chunks, handle) .unwrap() .1 .await @@ -292,16 +285,16 @@ mod tests { async fn test_persist_delete() { let db = test_db().await; - let late_arrival = Duration::from_secs(1); + let late_arrival = chrono::Duration::seconds(1); - let t0 = Instant::now(); + let t0 = Utc::now(); let t1 = t0 + late_arrival * 10; let t2 = t1 + late_arrival * 10; - *db.background_worker_now_override.lock() = Some(t0); + *db.now_override.lock() = Some(t0); write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await; - *db.background_worker_now_override.lock() = Some(t1); + *db.now_override.lock() = Some(t1); write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=3 23").await; let partition_keys = db.partition_keys().unwrap(); @@ -318,8 +311,9 @@ mod tests { db.delete("cpu", predicate).await.unwrap(); // Try to persist first write but it has been deleted + *db.now_override.lock() = Some(t0 + late_arrival); let maybe_chunk = db - .persist_partition("cpu", partition_key.as_str(), t0 + late_arrival) + .persist_partition("cpu", partition_key.as_str(), false) .await .unwrap(); @@ -341,13 +335,13 @@ mod tests { ); // Add a second set of writes one of which overlaps the above chunk - *db.background_worker_now_override.lock() = Some(t2); + *db.now_override.lock() = Some(t2); write_lp(db.as_ref(), "cpu,tag1=foo bar=2 23").await; write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=2 26").await; // Persist second write but not third let maybe_chunk = db - .persist_partition("cpu", partition_key.as_str(), t2) + .persist_partition("cpu", partition_key.as_str(), false) .await .unwrap(); assert!(maybe_chunk.is_some()); @@ -384,8 +378,9 @@ mod tests { db.delete("cpu", predicate).await.unwrap(); // Try to persist third set of writes + *db.now_override.lock() = Some(t2 + late_arrival); let maybe_chunk = db - .persist_partition("cpu", partition_key.as_str(), t2 + late_arrival) + .persist_partition("cpu", partition_key.as_str(), false) .await .unwrap(); @@ -404,10 +399,10 @@ mod tests { async fn persist_compacted_deletes() { let db = test_db().await; - let late_arrival = Duration::from_secs(1); - let t0 = Instant::now(); + let late_arrival = chrono::Duration::seconds(1); + let t0 = Utc::now(); - *db.background_worker_now_override.lock() = Some(t0); + *db.now_override.lock() = Some(t0); write_lp(db.as_ref(), "cpu,tag1=cupcakes bar=1 10").await; let partition_keys = db.partition_keys().unwrap(); @@ -439,8 +434,9 @@ mod tests { // Persistence windows unaware rows have been deleted assert!(!partition.read().persistence_windows().unwrap().is_empty()); + *db.now_override.lock() = Some(t0 + late_arrival); let maybe_chunk = db - .persist_partition("cpu", partition_key.as_str(), t0 + late_arrival * 2) + .persist_partition("cpu", partition_key.as_str(), false) .await .unwrap(); diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index 5029311e8c..efd2e77da3 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -12,7 +12,6 @@ use crate::db::{ use ::lifecycle::LifecycleWriteGuard; -use chrono::{DateTime, Utc}; use data_types::{chunk_metadata::ChunkLifecycleAction, job::Job}; use internal_types::selection::Selection; use observability_deps::tracing::{debug, warn}; @@ -39,20 +38,20 @@ use super::{ /// The implementation for writing a chunk to the object store /// +/// `flush_handle` describes both what to persist and also acts as a transaction +/// on the persistence windows +/// /// Returns a future registered with the tracker registry, and the corresponding tracker +/// /// The caller can either spawn this future to tokio, or block directly on it -pub(super) fn write_chunk_to_object_store( +pub(super) fn write_chunk_to_object_store( partition: LifecycleWriteGuard<'_, Partition, LockableCatalogPartition>, mut chunk: LifecycleWriteGuard<'_, CatalogChunk, LockableCatalogChunk>, flush_handle: FlushHandle, - f_parquet_creation_timestamp: F, ) -> Result<( TaskTracker, TrackedFuture>> + Send>, -)> -where - F: Fn() -> DateTime + Send, -{ +)> { let db = Arc::clone(&chunk.data().db); let addr = chunk.addr().clone(); let table_name = Arc::clone(&addr.table_name); @@ -125,7 +124,7 @@ where // IMPORTANT: Writing must take place while holding the cleanup lock, otherwise the file might be deleted // between creation and the transaction commit. let metadata = IoxMetadata { - creation_timestamp: f_parquet_creation_timestamp(), + creation_timestamp: db.utc_now(), table_name: Arc::clone(&table_name), partition_key: Arc::clone(&partition_key), chunk_id: addr.chunk_id, diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 97f45ef542..9ec526fc48 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -130,7 +130,7 @@ pub async fn seek_to_end(db: &Db, write_buffer: &mut dyn WriteBufferReading) -> let mut windows = PersistenceWindows::new( partition.addr().clone(), late_arrival_window, - db.background_worker_now(), + db.utc_now(), ); windows.mark_seen_and_persisted(&dummy_checkpoint); partition.set_persistence_windows(windows); @@ -236,7 +236,7 @@ pub async fn perform_replay( |sequence, partition_key, table_batch| { filter_entry(sequence, partition_key, table_batch, replay_plan) }, - Utc::now(), + db.utc_now(), ) { Ok(_) => { break; @@ -289,7 +289,7 @@ pub async fn perform_replay( let mut windows = PersistenceWindows::new( partition.addr().clone(), late_arrival_window, - db.background_worker_now(), + db.utc_now(), ); windows.mark_seen_and_persisted(partition_checkpoint); partition.set_persistence_windows(windows); @@ -418,7 +418,7 @@ mod tests { }; use arrow_util::assert_batches_eq; - use chrono::Utc; + use chrono::{DateTime, Utc}; use data_types::{ database_rules::{PartitionTemplate, Partitioner, TemplatePart}, server_id::ServerId, @@ -576,7 +576,7 @@ mod tests { db_name, partition_template.clone(), self.catalog_transactions_until_checkpoint, - Instant::now(), + Utc::now(), ) .await; @@ -610,7 +610,7 @@ mod tests { join_handle.await.unwrap(); // remember time - let now = test_db.db.background_worker_now_override.lock().unwrap(); + let now = test_db.db.now_override.lock().unwrap(); // drop old DB drop(test_db); @@ -656,14 +656,7 @@ mod tests { for (table_name, partition_key) in partitions { println!("Persist {}:{}", table_name, partition_key); loop { - match db - .persist_partition( - table_name, - partition_key, - db.background_worker_now(), - ) - .await - { + match db.persist_partition(table_name, partition_key, false).await { Ok(_) => break, Err(crate::db::Error::CannotFlushPartition { .. }) => { // cannot persist right now because of some lifecycle action, so wait a bit @@ -700,8 +693,8 @@ mod tests { } } Step::MakeWritesPersistable => { - let mut guard = test_db.db.background_worker_now_override.lock(); - *guard = Some(guard.unwrap() + Duration::from_secs(60)); + let mut guard = test_db.db.now_override.lock(); + *guard = Some(guard.unwrap() + chrono::Duration::seconds(60)); } Step::Assert(checks) => { Self::eval_checks(&checks, true, &test_db).await; @@ -768,7 +761,7 @@ mod tests { db_name: &'static str, partition_template: PartitionTemplate, catalog_transactions_until_checkpoint: NonZeroU64, - now: Instant, + now: DateTime, ) -> (TestDb, CancellationToken, JoinHandle<()>) { let test_db = TestDb::builder() .object_store(object_store) @@ -786,7 +779,7 @@ mod tests { .await; // Mock time - *test_db.db.background_worker_now_override.lock() = Some(now); + *test_db.db.now_override.lock() = Some(now); // start background worker let shutdown: CancellationToken = Default::default(); diff --git a/server_benchmarks/benches/catalog_persistence.rs b/server_benchmarks/benches/catalog_persistence.rs index 0fc9b4988e..b767083f48 100644 --- a/server_benchmarks/benches/catalog_persistence.rs +++ b/server_benchmarks/benches/catalog_persistence.rs @@ -7,7 +7,7 @@ use std::{ convert::TryFrom, num::{NonZeroU32, NonZeroU64}, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use tokio::{ runtime::{Handle, Runtime}, @@ -88,11 +88,7 @@ async fn setup(object_store: Arc, done: &Mutex) { .unwrap(); let chunk = db - .persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) + .persist_partition(table_name, partition_key, true) .await .unwrap() .unwrap(); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 4d8bb67914..d8bb3bd9cf 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -1,7 +1,6 @@ use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use std::sync::Arc; -use std::time::Instant; use data_types::chunk_metadata::ChunkId; use data_types::{server_id::ServerId, DatabaseName}; @@ -573,7 +572,7 @@ where .db(&db_name) .map_err(default_server_error_handler)?; - db.persist_partition(&table_name, &partition_key, Instant::now()) + db.persist_partition(&table_name, &partition_key, false) .await .map_err(default_db_error_handler)?;