From 844a025c7cb5835880e1ad365238591366c59de4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 23 Jul 2021 09:31:28 +0100 Subject: [PATCH] feat: drop based on LRU (#2075) (#2092) * feat: drop based on LRU (#2075) * chore: review feedback Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + lifecycle/Cargo.toml | 1 + lifecycle/src/lib.rs | 4 +- lifecycle/src/policy.rs | 316 ++++++++++++++++--------------------- server/src/db/lifecycle.rs | 39 ++--- 5 files changed, 162 insertions(+), 199 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50185e1843..3ceb745416 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,6 +1889,7 @@ dependencies = [ "data_types", "futures", "hashbrown", + "internal_types", "observability_deps", "tokio", "tracker", diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml index 1742bfa6d8..b3974cb973 100644 --- a/lifecycle/Cargo.toml +++ b/lifecycle/Cargo.toml @@ -10,6 +10,7 @@ chrono = "0.4" data_types = { path = "../data_types" } futures = "0.3" hashbrown = "0.11" +internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } tokio = { version = "1.0", features = ["macros", "time"] } tracker = { path = "../tracker" } diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index ff55cf062f..c8ccd92bee 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -14,6 +14,7 @@ use data_types::chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}; use data_types::database_rules::LifecycleRules; use data_types::DatabaseName; pub use guard::*; +use internal_types::access::AccessMetrics; pub use policy::*; use std::time::Instant; use tracker::TaskTracker; @@ -173,7 +174,8 @@ pub trait LifecycleChunk { /// Returns the min timestamp contained within this chunk fn min_timestamp(&self) -> DateTime; - fn time_of_first_write(&self) -> Option>; + /// Returns the access metrics for this chunk + fn access_metrics(&self) -> AccessMetrics; fn time_of_last_write(&self) -> Option>; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index beb35464ea..73c4760587 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -15,6 +15,7 @@ use crate::{ LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition, PersistHandle, }; +use internal_types::access::AccessMetrics; /// Number of seconds to wait before retying a failed lifecycle action pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); @@ -96,7 +97,7 @@ where partition, action, chunk_id: chunk.addr().chunk_id, - first_write: chunk.time_of_first_write(), + access_metrics: chunk.access_metrics(), }) } } @@ -637,18 +638,16 @@ struct FreeCandidate<'a, P> { partition: &'a P, chunk_id: u32, action: FreeAction, - first_write: Option>, + access_metrics: AccessMetrics, } fn sort_free_candidates

(candidates: &mut Vec>) { candidates.sort_unstable_by(|a, b| match a.action.cmp(&b.action) { - // Order candidates with the same FreeAction by first write time, with nulls last - std::cmp::Ordering::Equal => match (a.first_write.as_ref(), b.first_write.as_ref()) { - (Some(a), Some(b)) => a.cmp(b), - (None, Some(_)) => std::cmp::Ordering::Greater, - (Some(_), None) => std::cmp::Ordering::Less, - (None, None) => std::cmp::Ordering::Equal, - }, + // Order candidates with the same FreeAction by last access time + std::cmp::Ordering::Equal => a + .access_metrics + .last_instant + .cmp(&b.access_metrics.last_instant), o => o, }) } @@ -710,19 +709,14 @@ mod tests { addr: ChunkAddr, row_count: usize, min_timestamp: Option>, - time_of_first_write: Option>, + access_metrics: AccessMetrics, time_of_last_write: Option>, lifecycle_action: Option>, storage: ChunkStorage, } impl TestChunk { - fn new( - id: u32, - time_of_first_write: Option, - time_of_last_write: Option, - storage: ChunkStorage, - ) -> Self { + fn new(id: u32, time_of_last_write: Option, storage: ChunkStorage) -> Self { let addr = ChunkAddr { db_name: Arc::from(""), table_name: Arc::from(""), @@ -734,47 +728,34 @@ mod tests { addr, row_count: 10, min_timestamp: None, - time_of_first_write: time_of_first_write.map(from_secs), + access_metrics: AccessMetrics { + count: 0, + last_instant: Instant::now(), + }, time_of_last_write: time_of_last_write.map(from_secs), lifecycle_action: None, storage, } } - fn with_row_count(self, row_count: usize) -> Self { - Self { - addr: self.addr, - row_count, - min_timestamp: self.min_timestamp, - time_of_first_write: self.time_of_first_write, - time_of_last_write: self.time_of_last_write, - lifecycle_action: self.lifecycle_action, - storage: self.storage, - } + fn with_row_count(mut self, row_count: usize) -> Self { + self.row_count = row_count; + self } - fn with_action(self, action: ChunkLifecycleAction) -> Self { - Self { - addr: self.addr, - row_count: self.row_count, - min_timestamp: self.min_timestamp, - time_of_first_write: self.time_of_first_write, - time_of_last_write: self.time_of_last_write, - lifecycle_action: Some(TaskTracker::complete(action)), - storage: self.storage, - } + fn with_action(mut self, action: ChunkLifecycleAction) -> Self { + self.lifecycle_action = Some(TaskTracker::complete(action)); + self } - fn with_min_timestamp(self, min_timestamp: DateTime) -> Self { - Self { - addr: self.addr, - row_count: self.row_count, - min_timestamp: Some(min_timestamp), - time_of_first_write: self.time_of_first_write, - time_of_last_write: self.time_of_last_write, - lifecycle_action: self.lifecycle_action, - storage: self.storage, - } + fn with_min_timestamp(mut self, min_timestamp: DateTime) -> Self { + self.min_timestamp = Some(min_timestamp); + self + } + + fn with_access_metrics(mut self, metrics: AccessMetrics) -> Self { + self.access_metrics = metrics; + self } } @@ -855,7 +836,7 @@ mod tests { let id = partition.next_id; partition.next_id += 1; - let mut new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer); + let mut new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer); new_chunk.row_count = 0; for chunk in &chunks { @@ -895,7 +876,7 @@ mod tests { partition.next_id += 1; // The remainder left behind after the split - let new_chunk = TestChunk::new(id, None, None, ChunkStorage::ReadBuffer) + let new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer) .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1)); partition @@ -988,8 +969,8 @@ mod tests { self.min_timestamp.unwrap() } - fn time_of_first_write(&self) -> Option> { - self.time_of_first_write + fn access_metrics(&self) -> AccessMetrics { + self.access_metrics.clone() } fn time_of_last_write(&self) -> Option> { @@ -1113,70 +1094,61 @@ mod tests { mub_row_threshold: NonZeroUsize::new(74).unwrap(), ..Default::default() }; - let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer); + let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer); assert!(!can_move(&rules, &chunk, from_secs(9))); assert!(can_move(&rules, &chunk, from_secs(11))); // can move even if the chunk is small - let chunk = - TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73); + let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73); assert!(can_move(&rules, &chunk, from_secs(11))); // If over the row count threshold, we should be able to move - let chunk = - TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer).with_row_count(74); + let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(74); assert!(can_move(&rules, &chunk, from_secs(0))); // If below the default row count threshold, it shouldn't move - let chunk = - TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer).with_row_count(73); + let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(73); assert!(!can_move(&rules, &chunk, from_secs(0))); } #[test] fn test_sort_free_candidates() { + let instant = Instant::now(); + let access_metrics = |secs: u64| AccessMetrics { + count: 1, + last_instant: instant + Duration::from_secs(secs), + }; + let mut candidates = vec![ - FreeCandidate { - partition: &(), - chunk_id: 0, - action: FreeAction::Drop, - first_write: None, - }, - FreeCandidate { - partition: &(), - chunk_id: 2, - action: FreeAction::Unload, - first_write: None, - }, FreeCandidate { partition: &(), chunk_id: 1, action: FreeAction::Unload, - first_write: Some(from_secs(40)), + access_metrics: access_metrics(40), }, FreeCandidate { partition: &(), chunk_id: 3, action: FreeAction::Unload, - first_write: Some(from_secs(20)), + access_metrics: access_metrics(20), }, FreeCandidate { partition: &(), chunk_id: 4, action: FreeAction::Unload, - first_write: Some(from_secs(10)), + access_metrics: access_metrics(10), }, FreeCandidate { partition: &(), chunk_id: 5, action: FreeAction::Drop, - first_write: Some(from_secs(10)), + access_metrics: access_metrics(10), }, FreeCandidate { partition: &(), chunk_id: 6, action: FreeAction::Drop, - first_write: Some(from_secs(5)), + access_metrics: access_metrics(5), }, ]; @@ -1186,8 +1158,8 @@ mod tests { // Should first unload, then drop // - // Should order the same actions by write time, with nulls last - assert_eq!(ids, vec![4, 3, 1, 2, 6, 5, 0]) + // Should order the same actions by access time, with nulls last + assert_eq!(ids, vec![4, 3, 1, 6, 5]) } #[test] @@ -1195,9 +1167,9 @@ mod tests { // The default rules shouldn't do anything let rules = LifecycleRules::default(); let chunks = vec![ - TestChunk::new(0, Some(1), Some(1), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(20), Some(1), ChunkStorage::OpenMutableBuffer), - TestChunk::new(2, Some(30), Some(1), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, Some(1), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, Some(1), ChunkStorage::OpenMutableBuffer), + TestChunk::new(2, Some(1), ChunkStorage::OpenMutableBuffer), ]; let db = TestDb::new(rules, chunks); @@ -1213,9 +1185,9 @@ mod tests { ..Default::default() }; let chunks = vec![ - TestChunk::new(0, Some(0), Some(8), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), - TestChunk::new(2, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, Some(8), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, Some(5), ChunkStorage::OpenMutableBuffer), + TestChunk::new(2, Some(0), ChunkStorage::OpenMutableBuffer), ]; let db = TestDb::new(rules, chunks); @@ -1290,12 +1262,7 @@ mod tests { ..Default::default() }; - let chunks = vec![TestChunk::new( - 0, - Some(0), - Some(0), - ChunkStorage::OpenMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1303,26 +1270,44 @@ mod tests { lifecycle.check_for_work(from_secs(10), Instant::now()); assert_eq!(*db.events.read(), vec![]); - let chunks = vec![ - // two "open" chunks => they must not be dropped (yet) - TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), - // "moved" chunk => can be dropped because `drop_non_persistent=true` - TestChunk::new(2, Some(0), Some(0), ChunkStorage::ReadBuffer), - // "writing" chunk => cannot be unloaded while write is in-progress - TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) - .with_action(ChunkLifecycleAction::Persisting), - // "written" chunk => can be unloaded - TestChunk::new(4, Some(0), Some(0), ChunkStorage::ReadBufferAndObjectStore), - ]; + let instant = Instant::now(); + + let chunks = + vec![ + // two "open" chunks => they must not be dropped (yet) + TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), + // "moved" chunk => can be dropped because `drop_non_persistent=true` + TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), + // "writing" chunk => cannot be unloaded while write is in-progress + TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) + .with_action(ChunkLifecycleAction::Persisting), + // "written" chunk => can be unloaded + TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore) + .with_access_metrics(AccessMetrics { + count: 1, + last_instant: instant, + }), + // "written" chunk => can be unloaded + TestChunk::new(5, Some(0), ChunkStorage::ReadBufferAndObjectStore) + .with_access_metrics(AccessMetrics { + count: 12, + last_instant: instant - Duration::from_secs(1), + }), + ]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); + // Should unload chunk 5 first as access time is smaller lifecycle.check_for_work(from_secs(10), Instant::now()); assert_eq!( *db.events.read(), - vec![MoverEvents::Unload(4), MoverEvents::Drop(2)] + vec![ + MoverEvents::Unload(5), + MoverEvents::Unload(4), + MoverEvents::Drop(2) + ] ); } @@ -1339,12 +1324,7 @@ mod tests { }; assert!(!rules.drop_non_persisted); - let chunks = vec![TestChunk::new( - 0, - Some(0), - Some(0), - ChunkStorage::OpenMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1354,15 +1334,15 @@ mod tests { let chunks = vec![ // two "open" chunks => they must not be dropped (yet) - TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(0), Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), // "moved" chunk => cannot be dropped because `drop_non_persistent=false` - TestChunk::new(2, Some(0), Some(0), ChunkStorage::ReadBuffer), + TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), // "writing" chunk => cannot be drop while write is in-progess - TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) .with_action(ChunkLifecycleAction::Persisting), // "written" chunk => can be unloaded - TestChunk::new(4, Some(0), Some(0), ChunkStorage::ReadBufferAndObjectStore), + TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore), ]; let db = TestDb::new(rules, chunks); @@ -1380,12 +1360,7 @@ mod tests { ..Default::default() }; - let chunks = vec![TestChunk::new( - 0, - Some(0), - Some(0), - ChunkStorage::OpenMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1408,61 +1383,55 @@ mod tests { let partitions = vec![ TestPartition::new(vec![ // still receiving writes => cannot compact - TestChunk::new(0, Some(0), Some(20), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, Some(20), ChunkStorage::OpenMutableBuffer), ]), TestPartition::new(vec![ // still receiving writes => cannot compact - TestChunk::new(1, Some(0), Some(20), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, Some(20), ChunkStorage::OpenMutableBuffer), // closed => can compact - TestChunk::new(2, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(2, Some(20), ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // open but cold => can compact - TestChunk::new(3, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), + TestChunk::new(3, Some(5), ChunkStorage::OpenMutableBuffer), // closed => can compact - TestChunk::new(4, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(4, Some(20), ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(5, Some(0), Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(5, Some(20), ChunkStorage::ReadBuffer), // persisted => cannot compact - TestChunk::new(6, Some(0), Some(20), ChunkStorage::ReadBufferAndObjectStore), + TestChunk::new(6, Some(20), ChunkStorage::ReadBufferAndObjectStore), // persisted => cannot compact - TestChunk::new(7, Some(0), Some(20), ChunkStorage::ObjectStoreOnly), + TestChunk::new(7, Some(20), ChunkStorage::ObjectStoreOnly), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(8, Some(0), Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(8, Some(20), ChunkStorage::ReadBuffer), // closed => can compact - TestChunk::new(9, Some(0), Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(9, Some(20), ChunkStorage::ReadBuffer), // persisted => cannot compact - TestChunk::new( - 10, - Some(0), - Some(20), - ChunkStorage::ReadBufferAndObjectStore, - ), + TestChunk::new(10, Some(20), ChunkStorage::ReadBufferAndObjectStore), // persisted => cannot compact - TestChunk::new(11, Some(0), Some(20), ChunkStorage::ObjectStoreOnly), + TestChunk::new(11, Some(20), ChunkStorage::ObjectStoreOnly), ]), TestPartition::new(vec![ // open but cold => can compact - TestChunk::new(12, Some(0), Some(5), ChunkStorage::OpenMutableBuffer), + TestChunk::new(12, Some(5), ChunkStorage::OpenMutableBuffer), ]), TestPartition::new(vec![ // already compacted => should not compact - TestChunk::new(13, Some(0), Some(5), ChunkStorage::ReadBuffer), + TestChunk::new(13, Some(5), ChunkStorage::ReadBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(14, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(14, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), // too many individual rows => ignore - TestChunk::new(15, Some(0), Some(20), ChunkStorage::ReadBuffer) - .with_row_count(1_000), + TestChunk::new(15, Some(20), ChunkStorage::ReadBuffer).with_row_count(1_000), // closed => can compact - TestChunk::new(16, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(16, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), // too many total rows => next compaction job - TestChunk::new(17, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(17, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), // too many total rows => next compaction job - TestChunk::new(18, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(18, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), ]), ]; @@ -1497,19 +1466,19 @@ mod tests { let partitions = vec![ TestPartition::new(vec![ // closed => can compact - TestChunk::new(0, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(0, Some(20), ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(10, Some(0), Some(30), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(10, Some(30), ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(12, Some(0), Some(40), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(12, Some(40), ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(1, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(1, Some(20), ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(200, Some(0), Some(10), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(200, Some(10), ChunkStorage::ClosedMutableBuffer), ]), ]; @@ -1547,64 +1516,64 @@ mod tests { let partitions = vec![ // Insufficient rows and not old enough => don't persist but can compact TestPartition::new(vec![ - TestChunk::new(0, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(0, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(1, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(1, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(10, now, from_secs(20)), // Sufficient rows => persist TestPartition::new(vec![ - TestChunk::new(2, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(2, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(3, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Writes too old => persist TestPartition::new(vec![ // Should split open chunks - TestChunk::new(4, Some(0), Some(20), ChunkStorage::OpenMutableBuffer) + TestChunk::new(4, Some(20), ChunkStorage::OpenMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(5, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(5, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), - TestChunk::new(6, Some(0), Some(0), ChunkStorage::ObjectStoreOnly) + TestChunk::new(6, Some(0), ChunkStorage::ObjectStoreOnly) .with_min_timestamp(from_secs(5)), ]) .with_persistence(10, now - Duration::from_secs(10), from_secs(20)), // Sufficient rows but conflicting compaction => prevent compaction TestPartition::new(vec![ - TestChunk::new(7, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(7, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)) .with_action(ChunkLifecycleAction::Compacting), // This chunk would be a compaction candidate, but we want to persist it - TestChunk::new(8, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(8, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(9, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(9, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Sufficient rows and non-conflicting compaction => persist TestPartition::new(vec![ - TestChunk::new(10, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(10, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)) .with_action(ChunkLifecycleAction::Compacting), - TestChunk::new(11, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(11, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(12, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(12, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact TestPartition::new(vec![ - TestChunk::new(13, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(13, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)) .with_action(ChunkLifecycleAction::Compacting), - TestChunk::new(14, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(14, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)), - TestChunk::new(15, Some(0), Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(15, Some(0), ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(16, Some(0), Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(16, Some(0), ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), @@ -1635,12 +1604,7 @@ mod tests { late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; - let chunks = vec![TestChunk::new( - 0, - Some(40), - Some(40), - ChunkStorage::OpenMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, Some(40), ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1658,7 +1622,6 @@ mod tests { let chunks = vec![TestChunk::new( 0, Some(40), - Some(40), ChunkStorage::ClosedMutableBuffer, )]; @@ -1672,12 +1635,7 @@ mod tests { #[test] fn test_recovers_lifecycle_action() { let rules = LifecycleRules::default(); - let chunks = vec![TestChunk::new( - 0, - None, - None, - ChunkStorage::ClosedMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, None, ChunkStorage::ClosedMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 1f5f20a032..c0fdc545b3 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -12,6 +12,7 @@ use data_types::job::Job; use data_types::partition_metadata::Statistics; use data_types::DatabaseName; use datafusion::physical_plan::SendableRecordBatchStream; +use internal_types::access::AccessMetrics; use internal_types::schema::merge::SchemaMerger; use internal_types::schema::{Schema, TIME_COLUMN_NAME}; use lifecycle::{ @@ -19,6 +20,7 @@ use lifecycle::{ LockablePartition, }; use observability_deps::tracing::{info, trace}; +use persistence_windows::persistence_windows::FlushHandle; use query::QueryChunkMeta; use tracker::{RwLock, TaskTracker}; @@ -31,7 +33,6 @@ pub(crate) use drop::drop_chunk; pub(crate) use error::{Error, Result}; pub(crate) use move_chunk::move_chunk_to_read_buffer; pub(crate) use persist::persist_chunks; -use persistence_windows::persistence_windows::FlushHandle; pub(crate) use unload::unload_read_buffer_chunk; use super::DbChunk; @@ -291,8 +292,24 @@ impl LifecycleChunk for CatalogChunk { .expect("failed to clear lifecycle action") } - fn time_of_first_write(&self) -> Option> { - self.time_of_first_write() + fn min_timestamp(&self) -> DateTime { + let table_summary = self.table_summary(); + let col = table_summary + .columns + .iter() + .find(|x| x.name == TIME_COLUMN_NAME) + .expect("time column expected"); + + let min = match &col.stats { + Statistics::I64(stats) => stats.min.expect("time column cannot be empty"), + _ => panic!("unexpected time column type"), + }; + + Utc.timestamp_nanos(min) + } + + fn access_metrics(&self) -> AccessMetrics { + self.access_recorder().get_metrics() } fn time_of_last_write(&self) -> Option> { @@ -310,22 +327,6 @@ impl LifecycleChunk for CatalogChunk { fn row_count(&self) -> usize { self.storage().0 } - - fn min_timestamp(&self) -> DateTime { - let table_summary = self.table_summary(); - let col = table_summary - .columns - .iter() - .find(|x| x.name == TIME_COLUMN_NAME) - .expect("time column expected"); - - let min = match &col.stats { - Statistics::I64(stats) => stats.min.expect("time column cannot be empty"), - _ => panic!("unexpected time column type"), - }; - - Utc.timestamp_nanos(min) - } } /// Executes a plan and collects the results into a read buffer chunk