From d1a4584dfc0134aa57951624cbc7de570c54cf12 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 2 Aug 2021 12:58:33 +0200 Subject: [PATCH] feat: easy way to suppress persitence from lifecycle policy --- lifecycle/src/policy.rs | 75 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 8207fe2772..3f964dc905 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -37,20 +37,44 @@ where /// Background tasks spawned by this `LifecyclePolicy` trackers: Vec>, + + /// Do not allow persistence even when the database rules would allow that. + /// + /// This can be helpful during some phases of the database startup process. + suppress_persistence: bool, } impl LifecyclePolicy where M: LifecycleDb, { + /// Create new policy. + /// + /// Persistence is allowed if the database rules allow it. pub fn new(db: M) -> Self { Self { db, trackers: vec![], active_compactions: 0, + suppress_persistence: false, } } + /// Create new policy that suppresses persistence even when the database rules allow it. + pub fn new_suppress_persistence(db: M) -> Self { + Self { + db, + trackers: vec![], + active_compactions: 0, + suppress_persistence: true, + } + } + + /// Stop suppressing persistence and allow it if the database rules allow it. + pub fn unsuppress_persistence(&mut self) { + self.suppress_persistence = false; + } + /// Check if database exceeds memory limits and free memory if necessary /// /// The policy will first try to unload persisted chunks in order of creation @@ -485,7 +509,7 @@ where // if the criteria for persistence have been satisfied, // but persistence cannot proceed because of in-progress // compactions - let stall_compaction_persisting = if rules.persist { + let stall_compaction_persisting = if rules.persist && !self.suppress_persistence { let persisting = self.maybe_persist_chunks(&db_name, partition, &rules, now_instant); if persisting { @@ -834,6 +858,12 @@ mod tests { for chunk in &chunks { partition.chunks.remove(&chunk.addr.chunk_id); new_chunk.row_count += chunk.row_count; + new_chunk.min_timestamp = match (new_chunk.min_timestamp, chunk.min_timestamp) { + (Some(ts1), Some(ts2)) => Some(ts1.min(ts2)), + (Some(ts), None) => Some(ts), + (None, Some(ts)) => Some(ts), + (None, None) => None, + }; } partition @@ -1585,6 +1615,49 @@ mod tests { ); } + #[test] + fn test_suppress_persistence() { + let rules = LifecycleRules { + persist: true, + persist_row_threshold: NonZeroUsize::new(1_000).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), + persist_age_threshold_seconds: NonZeroU32::new(10).unwrap(), + max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()), + ..Default::default() + }; + let now = Instant::now(); + + let partitions = vec![ + // Sufficient rows => could persist but should be suppressed + TestPartition::new(vec![ + TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer) + .with_min_timestamp(from_secs(10)), + TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), + ]) + .with_persistence(1_000, now, from_secs(20)), + ]; + + let db = TestDb::from_partitions(rules, partitions); + let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db); + + lifecycle.check_for_work(from_secs(0), now); + assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]); + + lifecycle.check_for_work(from_secs(0), now); + assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]); + + lifecycle.unsuppress_persistence(); + + lifecycle.check_for_work(from_secs(0), now); + assert_eq!( + *db.events.read(), + vec![ + MoverEvents::Compact(vec![2, 3]), + MoverEvents::Persist(vec![4]) + ] + ); + } + #[test] fn test_moves_open() { let rules = LifecycleRules {