From 8add00e761bd6de061bb234ed794d9459f9ac293 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Jul 2021 14:27:47 -0400 Subject: [PATCH] feat: Make CatalogChunk first/last write times required Connects to #1927. --- data_types/src/chunk_metadata.rs | 74 ++------- generated_types/src/chunk.rs | 35 +++-- lifecycle/src/lib.rs | 2 +- lifecycle/src/policy.rs | 204 +++++++++++-------------- server/src/db.rs | 158 ++++++++++--------- server/src/db/catalog/chunk.rs | 47 +++--- server/src/db/chunk.rs | 4 +- server/src/db/lifecycle.rs | 2 +- server/src/db/lifecycle/compact.rs | 26 ++-- server/src/db/lifecycle/persist.rs | 18 +-- server/src/db/system_tables/chunks.rs | 32 ++-- server/src/db/system_tables/columns.rs | 13 +- 12 files changed, 282 insertions(+), 333 deletions(-) diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index 7e4c19623d..67d3dc1fbf 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -140,12 +140,12 @@ pub struct ChunkSummary { /// The earliest time at which data contained within this chunk was written /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into - pub time_of_first_write: Option>, + pub time_of_first_write: DateTime, /// The latest time at which data contained within this chunk was written /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into - pub time_of_last_write: Option>, + pub time_of_last_write: DateTime, /// Time at which this chunk was marked as closed. Note this is /// not the same as the timestamps on the data itself @@ -173,66 +173,14 @@ pub struct DetailedChunkSummary { } impl ChunkSummary { - /// Construct a ChunkSummary that has None for all timestamps - #[allow(clippy::too_many_arguments)] - pub fn new_without_timestamps( - partition_key: Arc, - table_name: Arc, - id: u32, - storage: ChunkStorage, - lifecycle_action: Option, - memory_bytes: usize, - object_store_bytes: usize, - row_count: usize, - ) -> Self { - Self { - partition_key, - table_name, - id, - storage, - lifecycle_action, - memory_bytes, - object_store_bytes, - row_count, - time_of_last_access: None, - time_of_first_write: None, - time_of_last_write: None, - time_closed: None, - } - } - - /// Return a new ChunkSummary with None for all timestamps - pub fn normalize(self) -> Self { - let ChunkSummary { - partition_key, - table_name, - id, - storage, - lifecycle_action, - memory_bytes, - object_store_bytes, - row_count, - .. - } = self; - Self::new_without_timestamps( - partition_key, - table_name, - id, - storage, - lifecycle_action, - memory_bytes, - object_store_bytes, - row_count, - ) - } - - /// Normalizes a set of ChunkSummaries for comparison by removing timestamps - pub fn normalize_summaries(summaries: Vec) -> Vec { - let mut summaries = summaries - .into_iter() - .map(|summary| summary.normalize()) - .collect::>(); - summaries.sort_unstable(); - summaries + pub fn equal_without_timestamps(&self, other: &Self) -> bool { + self.partition_key == other.partition_key + && self.table_name == other.table_name + && self.id == other.id + && self.storage == other.storage + && self.lifecycle_action == other.lifecycle_action + && self.memory_bytes == other.memory_bytes + && self.object_store_bytes == other.object_store_bytes + && self.row_count == other.row_count } } diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index 05700d2449..c3947197e2 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -36,8 +36,8 @@ impl From for management::Chunk { object_store_bytes: object_store_bytes as u64, row_count: row_count as u64, time_of_last_access: time_of_last_access.map(Into::into), - time_of_first_write: time_of_first_write.map(Into::into), - time_of_last_write: time_of_last_write.map(Into::into), + time_of_first_write: Some(time_of_first_write.into()), + time_of_last_write: Some(time_of_last_write.into()), time_closed: time_closed.map(Into::into), } } @@ -83,6 +83,15 @@ impl TryFrom for ChunkSummary { t.map(|t| convert_timestamp(t, field)).transpose() }; + let required_timestamp = |t: Option, + field: &'static str| { + t.ok_or_else(|| FieldViolation { + field: field.to_string(), + description: "Timestamp is required".to_string(), + }) + .and_then(|t| convert_timestamp(t, field)) + }; + let management::Chunk { partition_key, table_name, @@ -109,8 +118,8 @@ impl TryFrom for ChunkSummary { object_store_bytes: object_store_bytes as usize, row_count: row_count as usize, time_of_last_access: timestamp(time_of_last_access, "time_of_last_access")?, - time_of_first_write: timestamp(time_of_first_write, "time_of_first_write")?, - time_of_last_write: timestamp(time_of_last_write, "time_of_last_write")?, + time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?, + time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?, time_closed: timestamp(time_closed, "time_closed")?, }) } @@ -158,6 +167,7 @@ mod test { #[test] fn valid_proto_to_summary() { + let now = Utc::now(); let proto = management::Chunk { partition_key: "foo".to_string(), table_name: "bar".to_string(), @@ -168,8 +178,8 @@ mod test { storage: management::ChunkStorage::ObjectStoreOnly.into(), lifecycle_action: management::ChunkLifecycleAction::Moving.into(), - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Some(now.into()), + time_of_last_write: Some(now.into()), time_closed: None, time_of_last_access: Some(google_types::protobuf::Timestamp { seconds: 50, @@ -187,8 +197,8 @@ mod test { row_count: 321, storage: ChunkStorage::ObjectStoreOnly, lifecycle_action: Some(ChunkLifecycleAction::Moving), - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: now, + time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), }; @@ -202,6 +212,7 @@ mod test { #[test] fn valid_summary_to_proto() { + let now = Utc::now(); let summary = ChunkSummary { partition_key: Arc::from("foo"), table_name: Arc::from("bar"), @@ -211,8 +222,8 @@ mod test { row_count: 321, storage: ChunkStorage::ObjectStoreOnly, lifecycle_action: Some(ChunkLifecycleAction::Persisting), - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: now, + time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), }; @@ -228,8 +239,8 @@ mod test { row_count: 321, storage: management::ChunkStorage::ObjectStoreOnly.into(), lifecycle_action: management::ChunkLifecycleAction::Persisting.into(), - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Some(now.into()), + time_of_last_write: Some(now.into()), time_closed: None, time_of_last_access: Some(google_types::protobuf::Timestamp { seconds: 12, diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 3a76aaf48a..68be41b25c 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -178,7 +178,7 @@ pub trait LifecycleChunk { /// Returns the access metrics for this chunk fn access_metrics(&self) -> AccessMetrics; - fn time_of_last_write(&self) -> Option>; + fn time_of_last_write(&self) -> DateTime; fn addr(&self) -> &ChunkAddr; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index ffa56e6ea2..fd6ed8dbb0 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -614,16 +614,7 @@ fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime< return true; } - match chunk.time_of_last_write() { - Some(last_write) - if elapsed_seconds(now, last_write) >= rules.late_arrive_window_seconds.get() => - { - true - } - - // Disable movement the chunk is empty, or the linger hasn't expired - _ => false, - } + elapsed_seconds(now, chunk.time_of_last_write()) >= rules.late_arrive_window_seconds.get() } /// An action to free up memory @@ -710,13 +701,13 @@ mod tests { row_count: usize, min_timestamp: Option>, access_metrics: AccessMetrics, - time_of_last_write: Option>, + time_of_last_write: DateTime, lifecycle_action: Option>, storage: ChunkStorage, } impl TestChunk { - fn new(id: u32, time_of_last_write: Option, storage: ChunkStorage) -> Self { + fn new(id: u32, time_of_last_write: i64, storage: ChunkStorage) -> Self { let addr = ChunkAddr { db_name: Arc::from(""), table_name: Arc::from(""), @@ -732,7 +723,7 @@ mod tests { count: 0, last_instant: Instant::now(), }, - time_of_last_write: time_of_last_write.map(from_secs), + time_of_last_write: from_secs(time_of_last_write), lifecycle_action: None, storage, } @@ -836,7 +827,7 @@ mod tests { let id = partition.next_id; partition.next_id += 1; - let mut new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer); + let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer); new_chunk.row_count = 0; for chunk in &chunks { @@ -876,7 +867,7 @@ mod tests { partition.next_id += 1; // The remainder left behind after the split - let new_chunk = TestChunk::new(id, None, ChunkStorage::ReadBuffer) + let new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer) .with_min_timestamp(handle.timestamp + chrono::Duration::nanoseconds(1)); partition @@ -973,7 +964,7 @@ mod tests { self.access_metrics.clone() } - fn time_of_last_write(&self) -> Option> { + fn time_of_last_write(&self) -> DateTime { self.time_of_last_write } @@ -1094,20 +1085,20 @@ mod tests { mub_row_threshold: NonZeroUsize::new(74).unwrap(), ..Default::default() }; - let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer); + let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer); assert!(!can_move(&rules, &chunk, from_secs(9))); assert!(can_move(&rules, &chunk, from_secs(11))); // can move even if the chunk is small - let chunk = TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer).with_row_count(73); + let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73); assert!(can_move(&rules, &chunk, from_secs(11))); // If over the row count threshold, we should be able to move - let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(74); + let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(74); assert!(can_move(&rules, &chunk, from_secs(0))); // If below the default row count threshold, it shouldn't move - let chunk = TestChunk::new(0, None, ChunkStorage::OpenMutableBuffer).with_row_count(73); + let chunk = TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer).with_row_count(73); assert!(!can_move(&rules, &chunk, from_secs(0))); } @@ -1167,9 +1158,9 @@ mod tests { // The default rules shouldn't do anything let rules = LifecycleRules::default(); let chunks = vec![ - TestChunk::new(0, Some(1), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(1), ChunkStorage::OpenMutableBuffer), - TestChunk::new(2, Some(1), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, 1, ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, 1, ChunkStorage::OpenMutableBuffer), + TestChunk::new(2, 1, ChunkStorage::OpenMutableBuffer), ]; let db = TestDb::new(rules, chunks); @@ -1185,9 +1176,9 @@ mod tests { ..Default::default() }; let chunks = vec![ - TestChunk::new(0, Some(8), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(5), ChunkStorage::OpenMutableBuffer), - TestChunk::new(2, Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, 8, ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, 5, ChunkStorage::OpenMutableBuffer), + TestChunk::new(2, 0, ChunkStorage::OpenMutableBuffer), ]; let db = TestDb::new(rules, chunks); @@ -1262,7 +1253,7 @@ mod tests { ..Default::default() }; - let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; + let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1272,29 +1263,30 @@ mod tests { let instant = Instant::now(); - let chunks = - vec![ - // two "open" chunks => they must not be dropped (yet) - TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), - // "moved" chunk => can be dropped because `drop_non_persistent=true` - TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), - // "writing" chunk => cannot be unloaded while write is in-progress - TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) - .with_action(ChunkLifecycleAction::Persisting), - // "written" chunk => can be unloaded - TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore) - .with_access_metrics(AccessMetrics { - count: 1, - last_instant: instant, - }), - // "written" chunk => can be unloaded - TestChunk::new(5, Some(0), ChunkStorage::ReadBufferAndObjectStore) - .with_access_metrics(AccessMetrics { - count: 12, - last_instant: instant - Duration::from_secs(1), - }), - ]; + let chunks = vec![ + // two "open" chunks => they must not be dropped (yet) + TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer), + // "moved" chunk => can be dropped because `drop_non_persistent=true` + TestChunk::new(2, 0, ChunkStorage::ReadBuffer), + // "writing" chunk => cannot be unloaded while write is in-progress + TestChunk::new(3, 0, ChunkStorage::ReadBuffer) + .with_action(ChunkLifecycleAction::Persisting), + // "written" chunk => can be unloaded + TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics( + AccessMetrics { + count: 1, + last_instant: instant, + }, + ), + // "written" chunk => can be unloaded + TestChunk::new(5, 0, ChunkStorage::ReadBufferAndObjectStore).with_access_metrics( + AccessMetrics { + count: 12, + last_instant: instant - Duration::from_secs(1), + }, + ), + ]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1324,7 +1316,7 @@ mod tests { }; assert!(!rules.drop_non_persisted); - let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; + let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules.clone(), chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1334,15 +1326,15 @@ mod tests { let chunks = vec![ // two "open" chunks => they must not be dropped (yet) - TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer), - TestChunk::new(1, Some(0), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, 0, ChunkStorage::OpenMutableBuffer), // "moved" chunk => cannot be dropped because `drop_non_persistent=false` - TestChunk::new(2, Some(0), ChunkStorage::ReadBuffer), + TestChunk::new(2, 0, ChunkStorage::ReadBuffer), // "writing" chunk => cannot be drop while write is in-progess - TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) + TestChunk::new(3, 0, ChunkStorage::ReadBuffer) .with_action(ChunkLifecycleAction::Persisting), // "written" chunk => can be unloaded - TestChunk::new(4, Some(0), ChunkStorage::ReadBufferAndObjectStore), + TestChunk::new(4, 0, ChunkStorage::ReadBufferAndObjectStore), ]; let db = TestDb::new(rules, chunks); @@ -1360,7 +1352,7 @@ mod tests { ..Default::default() }; - let chunks = vec![TestChunk::new(0, Some(0), ChunkStorage::OpenMutableBuffer)]; + let chunks = vec![TestChunk::new(0, 0, ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1383,55 +1375,55 @@ mod tests { let partitions = vec![ TestPartition::new(vec![ // still receiving writes => cannot compact - TestChunk::new(0, Some(20), ChunkStorage::OpenMutableBuffer), + TestChunk::new(0, 20, ChunkStorage::OpenMutableBuffer), ]), TestPartition::new(vec![ // still receiving writes => cannot compact - TestChunk::new(1, Some(20), ChunkStorage::OpenMutableBuffer), + TestChunk::new(1, 20, ChunkStorage::OpenMutableBuffer), // closed => can compact - TestChunk::new(2, Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(2, 20, ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // open but cold => can compact - TestChunk::new(3, Some(5), ChunkStorage::OpenMutableBuffer), + TestChunk::new(3, 5, ChunkStorage::OpenMutableBuffer), // closed => can compact - TestChunk::new(4, Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(4, 20, ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(5, Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(5, 20, ChunkStorage::ReadBuffer), // persisted => cannot compact - TestChunk::new(6, Some(20), ChunkStorage::ReadBufferAndObjectStore), + TestChunk::new(6, 20, ChunkStorage::ReadBufferAndObjectStore), // persisted => cannot compact - TestChunk::new(7, Some(20), ChunkStorage::ObjectStoreOnly), + TestChunk::new(7, 20, ChunkStorage::ObjectStoreOnly), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(8, Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(8, 20, ChunkStorage::ReadBuffer), // closed => can compact - TestChunk::new(9, Some(20), ChunkStorage::ReadBuffer), + TestChunk::new(9, 20, ChunkStorage::ReadBuffer), // persisted => cannot compact - TestChunk::new(10, Some(20), ChunkStorage::ReadBufferAndObjectStore), + TestChunk::new(10, 20, ChunkStorage::ReadBufferAndObjectStore), // persisted => cannot compact - TestChunk::new(11, Some(20), ChunkStorage::ObjectStoreOnly), + TestChunk::new(11, 20, ChunkStorage::ObjectStoreOnly), ]), TestPartition::new(vec![ // open but cold => can compact - TestChunk::new(12, Some(5), ChunkStorage::OpenMutableBuffer), + TestChunk::new(12, 5, ChunkStorage::OpenMutableBuffer), ]), TestPartition::new(vec![ // already compacted => should not compact - TestChunk::new(13, Some(5), ChunkStorage::ReadBuffer), + TestChunk::new(13, 5, ChunkStorage::ReadBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(14, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(14, 20, ChunkStorage::ReadBuffer).with_row_count(400), // too many individual rows => ignore - TestChunk::new(15, Some(20), ChunkStorage::ReadBuffer).with_row_count(1_000), + TestChunk::new(15, 20, ChunkStorage::ReadBuffer).with_row_count(1_000), // closed => can compact - TestChunk::new(16, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(16, 20, ChunkStorage::ReadBuffer).with_row_count(400), // too many total rows => next compaction job - TestChunk::new(17, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(17, 20, ChunkStorage::ReadBuffer).with_row_count(400), // too many total rows => next compaction job - TestChunk::new(18, Some(20), ChunkStorage::ReadBuffer).with_row_count(400), + TestChunk::new(18, 20, ChunkStorage::ReadBuffer).with_row_count(400), ]), ]; @@ -1466,19 +1458,19 @@ mod tests { let partitions = vec![ TestPartition::new(vec![ // closed => can compact - TestChunk::new(0, Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(0, 20, ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(10, Some(30), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(10, 30, ChunkStorage::ClosedMutableBuffer), // closed => can compact - TestChunk::new(12, Some(40), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(12, 40, ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(1, Some(20), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(1, 20, ChunkStorage::ClosedMutableBuffer), ]), TestPartition::new(vec![ // closed => can compact - TestChunk::new(200, Some(10), ChunkStorage::ClosedMutableBuffer), + TestChunk::new(200, 10, ChunkStorage::ClosedMutableBuffer), ]), ]; @@ -1516,65 +1508,59 @@ mod tests { let partitions = vec![ // Insufficient rows and not old enough => don't persist but can compact TestPartition::new(vec![ - TestChunk::new(0, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(1, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), + TestChunk::new(1, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), ]) .with_persistence(10, now, from_secs(20)), // Sufficient rows => persist TestPartition::new(vec![ - TestChunk::new(2, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(3, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), + TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Writes too old => persist TestPartition::new(vec![ // Should split open chunks - TestChunk::new(4, Some(20), ChunkStorage::OpenMutableBuffer) + TestChunk::new(4, 20, ChunkStorage::OpenMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(5, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), - TestChunk::new(6, Some(0), ChunkStorage::ObjectStoreOnly) + TestChunk::new(5, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), + TestChunk::new(6, 0, ChunkStorage::ObjectStoreOnly) .with_min_timestamp(from_secs(5)), ]) .with_persistence(10, now - Duration::from_secs(10), from_secs(20)), // Sufficient rows but conflicting compaction => prevent compaction TestPartition::new(vec![ - TestChunk::new(7, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(7, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)) .with_action(ChunkLifecycleAction::Compacting), // This chunk would be a compaction candidate, but we want to persist it - TestChunk::new(8, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(8, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(9, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), + TestChunk::new(9, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Sufficient rows and non-conflicting compaction => persist TestPartition::new(vec![ - TestChunk::new(10, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(10, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)) .with_action(ChunkLifecycleAction::Compacting), - TestChunk::new(11, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(11, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(12, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), + TestChunk::new(12, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), // Sufficient rows, non-conflicting compaction and compact-able chunk => persist + compact TestPartition::new(vec![ - TestChunk::new(13, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(13, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)) .with_action(ChunkLifecycleAction::Compacting), - TestChunk::new(14, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(14, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(21)), - TestChunk::new(15, Some(0), ChunkStorage::ClosedMutableBuffer) + TestChunk::new(15, 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), - TestChunk::new(16, Some(0), ChunkStorage::ReadBuffer) - .with_min_timestamp(from_secs(5)), + TestChunk::new(16, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), ]; @@ -1604,7 +1590,7 @@ mod tests { late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; - let chunks = vec![TestChunk::new(0, Some(40), ChunkStorage::OpenMutableBuffer)]; + let chunks = vec![TestChunk::new(0, 40, ChunkStorage::OpenMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1619,11 +1605,7 @@ mod tests { late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; - let chunks = vec![TestChunk::new( - 0, - Some(40), - ChunkStorage::ClosedMutableBuffer, - )]; + let chunks = vec![TestChunk::new(0, 40, ChunkStorage::ClosedMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); @@ -1635,7 +1617,7 @@ mod tests { #[test] fn test_recovers_lifecycle_action() { let rules = LifecycleRules::default(); - let chunks = vec![TestChunk::new(0, None, ChunkStorage::ClosedMutableBuffer)]; + let chunks = vec![TestChunk::new(0, 0, ChunkStorage::ClosedMutableBuffer)]; let db = TestDb::new(rules, chunks); let mut lifecycle = LifecyclePolicy::new(&db); diff --git a/server/src/db.rs b/server/src/db.rs index a0124ee779..40fd50270f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1332,7 +1332,7 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use bytes::Bytes; - use chrono::DateTime; + use chrono::{DateTime, TimeZone}; use data_types::{ chunk_metadata::ChunkStorage, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, @@ -2629,10 +2629,7 @@ mod tests { let chunk = partition.open_chunk().unwrap(); let chunk = chunk.read(); - ( - partition.last_write_at(), - chunk.time_of_last_write().unwrap(), - ) + (partition.last_write_at(), chunk.time_of_last_write()) }; let entry = lp_to_entry("cpu bar=true 10"); @@ -2644,7 +2641,7 @@ mod tests { assert_eq!(last_write_prev, partition.last_write_at()); let chunk = partition.open_chunk().unwrap(); let chunk = chunk.read(); - assert_eq!(chunk_last_write_prev, chunk.time_of_last_write().unwrap()); + assert_eq!(chunk_last_write_prev, chunk.time_of_last_write()); } } @@ -2788,9 +2785,9 @@ mod tests { println!("Chunk: {:#?}", chunk); // then the chunk creation and rollover times are as expected - assert!(start < chunk.time_of_first_write().unwrap()); - assert!(chunk.time_of_first_write().unwrap() < after_data_load); - assert!(chunk.time_of_first_write().unwrap() == chunk.time_of_last_write().unwrap()); + assert!(start < chunk.time_of_first_write()); + assert!(chunk.time_of_first_write() < after_data_load); + assert!(chunk.time_of_first_write() == chunk.time_of_last_write()); assert!(after_data_load < chunk.time_closed().unwrap()); assert!(chunk.time_closed().unwrap() < after_rollover); } @@ -2850,18 +2847,21 @@ mod tests { print!("Partitions: {:?}", db.partition_keys().unwrap()); let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15"); - let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries); - let expected = vec![ChunkSummary::new_without_timestamps( - Arc::from("1970-01-05T15"), - Arc::from("cpu"), - 0, - ChunkStorage::OpenMutableBuffer, - None, - 70, // memory_size - 0, // os_size - 1, - )]; + let expected = vec![ChunkSummary { + partition_key: Arc::from("1970-01-05T15"), + table_name: Arc::from("cpu"), + id: 0, + storage: ChunkStorage::OpenMutableBuffer, + lifecycle_action: None, + memory_bytes: 70, // memory_size + object_store_bytes: 0, // os_size + row_count: 1, + time_of_last_access: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(1), + time_closed: None, + }]; let size: usize = db .chunk_summaries() @@ -2872,11 +2872,14 @@ mod tests { assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size); - assert_eq!( - expected, chunk_summaries, - "expected:\n{:#?}\n\nactual:{:#?}\n\n", - expected, chunk_summaries - ); + for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { + assert!( + expected_summary.equal_without_timestamps(&actual_summary), + "expected:\n{:#?}\n\nactual:{:#?}\n\n", + expected_summary, + actual_summary + ); + } } #[tokio::test] @@ -2896,23 +2899,23 @@ mod tests { let summary = &chunk_summaries[0]; assert_eq!(summary.id, 0, "summary; {:#?}", summary); assert!( - summary.time_of_first_write.unwrap() > start, + summary.time_of_first_write > start, "summary; {:#?}", summary ); assert!( - summary.time_of_first_write.unwrap() < after_close, + summary.time_of_first_write < after_close, "summary; {:#?}", summary ); assert!( - summary.time_of_last_write.unwrap() > after_first_write, + summary.time_of_last_write > after_first_write, "summary; {:#?}", summary ); assert!( - summary.time_of_last_write.unwrap() < after_close, + summary.time_of_last_write < after_close, "summary; {:#?}", summary ); @@ -2930,8 +2933,8 @@ mod tests { } fn assert_first_last_times_eq(chunk_summary: &ChunkSummary) { - let first_write = chunk_summary.time_of_first_write.unwrap(); - let last_write = chunk_summary.time_of_last_write.unwrap(); + let first_write = chunk_summary.time_of_first_write; + let last_write = chunk_summary.time_of_last_write; assert_eq!(first_write, last_write); } @@ -2941,8 +2944,8 @@ mod tests { before: DateTime, after: DateTime, ) { - let first_write = chunk_summary.time_of_first_write.unwrap(); - let last_write = chunk_summary.time_of_last_write.unwrap(); + let first_write = chunk_summary.time_of_first_write; + let last_write = chunk_summary.time_of_last_write; assert!(before < first_write); assert!(before < last_write); @@ -2951,8 +2954,8 @@ mod tests { } fn assert_chunks_times_ordered(before: &ChunkSummary, after: &ChunkSummary) { - let before_last_write = before.time_of_last_write.unwrap(); - let after_first_write = after.time_of_first_write.unwrap(); + let before_last_write = before.time_of_last_write; + let after_first_write = after.time_of_first_write; assert!(before_last_write < after_first_write); } @@ -2963,14 +2966,14 @@ mod tests { } fn assert_chunks_first_times_eq(a: &ChunkSummary, b: &ChunkSummary) { - let a_first_write = a.time_of_first_write.unwrap(); - let b_first_write = b.time_of_first_write.unwrap(); + let a_first_write = a.time_of_first_write; + let b_first_write = b.time_of_first_write; assert_eq!(a_first_write, b_first_write); } fn assert_chunks_last_times_eq(a: &ChunkSummary, b: &ChunkSummary) { - let a_last_write = a.time_of_last_write.unwrap(); - let b_last_write = b.time_of_last_write.unwrap(); + let a_last_write = a.time_of_last_write; + let b_last_write = b.time_of_last_write; assert_eq!(a_last_write, b_last_write); } @@ -3132,49 +3135,62 @@ mod tests { assert_first_last_times_eq(&open_mb_t8); assert_first_last_times_between(&open_mb_t8, time7, time8); - let chunk_summaries = ChunkSummary::normalize_summaries(chunk_summaries); - let lifecycle_action = None; let expected = vec![ - ChunkSummary::new_without_timestamps( - Arc::from("1970-01-01T00"), - Arc::from("cpu"), - 2, - ChunkStorage::ReadBufferAndObjectStore, + ChunkSummary { + partition_key: Arc::from("1970-01-01T00"), + table_name: Arc::from("cpu"), + id: 2, + storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - 3332, // size of RB and OS chunks - 1523, // size of parquet file - 2, - ), - ChunkSummary::new_without_timestamps( - Arc::from("1970-01-05T15"), - Arc::from("cpu"), - 0, - ChunkStorage::ClosedMutableBuffer, + memory_bytes: 3332, // size of RB and OS chunks + object_store_bytes: 1523, // size of parquet file + row_count: 2, + time_of_last_access: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(1), + time_closed: None, + }, + ChunkSummary { + partition_key: Arc::from("1970-01-05T15"), + table_name: Arc::from("cpu"), + id: 0, + storage: ChunkStorage::ClosedMutableBuffer, lifecycle_action, - 2510, - 0, // no OS chunks - 1, - ), - ChunkSummary::new_without_timestamps( - Arc::from("1970-01-05T15"), - Arc::from("cpu"), - 1, - ChunkStorage::OpenMutableBuffer, + memory_bytes: 2510, + object_store_bytes: 0, // no OS chunks + row_count: 1, + time_of_last_access: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(1), + time_closed: None, + }, + ChunkSummary { + partition_key: Arc::from("1970-01-05T15"), + table_name: Arc::from("cpu"), + id: 1, + storage: ChunkStorage::OpenMutableBuffer, lifecycle_action, - 87, - 0, // no OS chunks - 1, - ), + memory_bytes: 87, + object_store_bytes: 0, // no OS chunks + row_count: 1, + time_of_last_access: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(1), + time_closed: None, + }, ]; for (expected_summary, actual_summary) in expected.iter().zip(chunk_summaries.iter()) { - assert_eq!( - expected_summary, actual_summary, + assert!( + expected_summary.equal_without_timestamps(&actual_summary), "\n\nexpected item:\n{:#?}\n\nactual item:\n{:#?}\n\n\ all expected:\n{:#?}\n\nall actual:\n{:#?}", - expected_summary, actual_summary, expected, chunk_summaries + expected_summary, + actual_summary, + expected, + chunk_summaries ); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index c1966fa4e3..b1923b6918 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -200,12 +200,12 @@ pub struct CatalogChunk { /// The earliest time at which data contained within this chunk was written /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into - time_of_first_write: Option>, + time_of_first_write: DateTime, /// The latest time at which data contained within this chunk was written /// into IOx. Note due to the compaction, etc... this may not be the chunk /// that data was originally written into - time_of_last_write: Option>, + time_of_last_write: DateTime, /// Time at which this chunk was marked as closed. Note this is /// not the same as the timestamps on the data itself @@ -269,8 +269,9 @@ impl CatalogChunk { ) -> Self { assert_eq!(chunk.table_name(), &addr.table_name); - let first_write = chunk.table_summary().time_of_first_write; - let last_write = chunk.table_summary().time_of_last_write; + let summary = chunk.table_summary(); + let time_of_first_write = summary.time_of_first_write; + let time_of_last_write = summary.time_of_last_write; let stage = ChunkStage::Open { mb_chunk: chunk }; @@ -284,8 +285,8 @@ impl CatalogChunk { lifecycle_action: None, metrics, access_recorder: Default::default(), - time_of_first_write: Some(first_write), - time_of_last_write: Some(last_write), + time_of_first_write, + time_of_last_write, time_closed: None, }; chunk.update_metrics(); @@ -302,8 +303,8 @@ impl CatalogChunk { metrics: ChunkMetrics, ) -> Self { let summary = chunk.table_summary(); - let first_write = summary.time_of_first_write; - let last_write = summary.time_of_last_write; + let time_of_first_write = summary.time_of_first_write; + let time_of_last_write = summary.time_of_last_write; let stage = ChunkStage::Frozen { meta: Arc::new(ChunkMetadata { @@ -323,8 +324,8 @@ impl CatalogChunk { lifecycle_action: None, metrics, access_recorder: Default::default(), - time_of_first_write: Some(first_write), - time_of_last_write: Some(last_write), + time_of_first_write, + time_of_last_write, time_closed: None, }; chunk.update_metrics(); @@ -341,8 +342,8 @@ impl CatalogChunk { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); let summary = chunk.table_summary(); - let first_write = summary.time_of_first_write; - let last_write = summary.time_of_last_write; + let time_of_first_write = summary.time_of_first_write; + let time_of_last_write = summary.time_of_last_write; // this is temporary let table_summary = TableSummary { @@ -368,8 +369,8 @@ impl CatalogChunk { lifecycle_action: None, metrics, access_recorder: Default::default(), - time_of_first_write: Some(first_write), - time_of_last_write: Some(last_write), + time_of_first_write, + time_of_last_write, time_closed: None, }; chunk.update_metrics(); @@ -411,11 +412,11 @@ impl CatalogChunk { .map_or(false, |action| action.metadata() == &lifecycle_action) } - pub fn time_of_first_write(&self) -> Option> { + pub fn time_of_first_write(&self) -> DateTime { self.time_of_first_write } - pub fn time_of_last_write(&self) -> Option> { + pub fn time_of_last_write(&self) -> DateTime { self.time_of_last_write } @@ -478,18 +479,10 @@ impl CatalogChunk { self.metrics.timestamp_histogram.add(timestamps); self.access_recorder.record_access_now(); - if let Some(t) = self.time_of_first_write { - self.time_of_first_write = Some(t.min(time_of_write)) - } else { - self.time_of_first_write = Some(time_of_write) - } + self.time_of_first_write = self.time_of_first_write.min(time_of_write); // DateTime isn't necessarily monotonic - if let Some(t) = self.time_of_last_write { - self.time_of_last_write = Some(t.max(time_of_write)) - } else { - self.time_of_last_write = Some(time_of_write) - } + self.time_of_last_write = self.time_of_last_write.max(time_of_write); self.update_metrics(); } @@ -984,8 +977,6 @@ mod tests { let mb_chunk = make_mb_chunk(&addr.table_name); let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered()); assert!(matches!(chunk.stage(), &ChunkStage::Open { .. })); - assert!(chunk.time_of_first_write.is_some()); - assert!(chunk.time_of_last_write.is_some()); } #[tokio::test] diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 32f3b559cb..22ffd15799 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -612,8 +612,8 @@ mod tests { let chunk = chunks.into_iter().next().unwrap(); let chunk = chunk.read(); assert_eq!(chunk.storage().1, ChunkStorage::ObjectStoreOnly); - let first_write = chunk.time_of_first_write().unwrap(); - let last_write = chunk.time_of_last_write().unwrap(); + let first_write = chunk.time_of_first_write(); + let last_write = chunk.time_of_last_write(); assert_eq!(first_write, last_write); assert!(before_creation < first_write); assert!(last_write < after_creation); diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 970f1361e6..808669da92 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -310,7 +310,7 @@ impl LifecycleChunk for CatalogChunk { self.access_recorder().get_metrics() } - fn time_of_last_write(&self) -> Option> { + fn time_of_last_write(&self) -> DateTime { self.time_of_last_write() } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 29a6f15f38..336276e089 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -48,17 +48,15 @@ pub(crate) fn compact_chunks( input_rows += chunk.table_summary().total_count(); - time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) { - (Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)), - (Some(only), None) | (None, Some(only)) => Some(only), - (None, None) => None, - }; + let candidate_first = chunk.time_of_first_write(); + time_of_first_write = time_of_first_write + .map(|prev_first| prev_first.min(candidate_first)) + .or(Some(candidate_first)); - time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) { - (Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)), - (Some(only), None) | (None, Some(only)) => Some(only), - (None, None) => None, - }; + let candidate_last = chunk.time_of_last_write(); + time_of_last_write = time_of_last_write + .map(|prev_last| prev_last.max(candidate_last)) + .or(Some(candidate_last)); chunk.set_compacting(®istration)?; Ok(DbChunk::snapshot(&*chunk)) @@ -179,15 +177,15 @@ mod tests { chunk_summaries.sort_unstable(); let mub_summary = &chunk_summaries[0]; - let first_mub_write = mub_summary.time_of_first_write.unwrap(); - let last_mub_write = mub_summary.time_of_last_write.unwrap(); + let first_mub_write = mub_summary.time_of_first_write; + let last_mub_write = mub_summary.time_of_last_write; assert!(time2 < first_mub_write); assert_eq!(first_mub_write, last_mub_write); assert!(first_mub_write < time3); let rub_summary = &chunk_summaries[1]; - let first_rub_write = rub_summary.time_of_first_write.unwrap(); - let last_rub_write = rub_summary.time_of_last_write.unwrap(); + let first_rub_write = rub_summary.time_of_first_write; + let last_rub_write = rub_summary.time_of_last_write; assert!(time0 < first_rub_write); assert!(first_rub_write < last_rub_write); assert!(last_rub_write < time1); diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index c5679e7f0d..ab98e20aa6 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -54,17 +54,15 @@ pub fn persist_chunks( input_rows += chunk.table_summary().total_count(); - time_of_first_write = match (time_of_first_write, chunk.time_of_first_write()) { - (Some(prev_first), Some(candidate_first)) => Some(prev_first.min(candidate_first)), - (Some(only), None) | (None, Some(only)) => Some(only), - (None, None) => None, - }; + let candidate_first = chunk.time_of_first_write(); + time_of_first_write = time_of_first_write + .map(|prev_first| prev_first.min(candidate_first)) + .or(Some(candidate_first)); - time_of_last_write = match (time_of_last_write, chunk.time_of_last_write()) { - (Some(prev_last), Some(candidate_last)) => Some(prev_last.max(candidate_last)), - (Some(only), None) | (None, Some(only)) => Some(only), - (None, None) => None, - }; + let candidate_last = chunk.time_of_last_write(); + time_of_last_write = time_of_last_write + .map(|prev_last| prev_last.max(candidate_last)) + .or(Some(candidate_last)); chunk.set_writing_to_object_store(®istration)?; query_chunks.push(DbChunk::snapshot(&*chunk)); diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs index f719dbc005..17a5da407d 100644 --- a/server/src/db/system_tables/chunks.rs +++ b/server/src/db/system_tables/chunks.rs @@ -48,15 +48,19 @@ fn chunk_summaries_schema() -> SchemaRef { Field::new("object_store_bytes", DataType::UInt64, false), Field::new("row_count", DataType::UInt64, false), Field::new("time_of_last_access", ts.clone(), true), - Field::new("time_of_first_write", ts.clone(), true), - Field::new("time_of_last_write", ts.clone(), true), + Field::new("time_of_first_write", ts.clone(), false), + Field::new("time_of_last_write", ts.clone(), false), Field::new("time_closed", ts, true), ])) } // TODO: Use a custom proc macro or serde to reduce the boilerplate -fn time_to_ts(time: Option>) -> Option { - time.map(|ts| ts.timestamp_nanos()) +fn optional_time_to_ts(time: Option>) -> Option { + time.and_then(time_to_ts) +} + +fn time_to_ts(ts: DateTime) -> Option { + Some(ts.timestamp_nanos()) } fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result { @@ -92,7 +96,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< let time_of_last_access = chunks .iter() .map(|c| c.time_of_last_access) - .map(time_to_ts) + .map(optional_time_to_ts) .collect::(); let time_of_first_write = chunks .iter() @@ -107,7 +111,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< let time_closed = chunks .iter() .map(|c| c.time_closed) - .map(time_to_ts) + .map(optional_time_to_ts) .collect::(); RecordBatch::try_new( @@ -149,8 +153,8 @@ mod tests { object_store_bytes: 0, row_count: 11, time_of_last_access: None, - time_of_first_write: Some(Utc.timestamp_nanos(10_000_000_000)), - time_of_last_write: None, + time_of_first_write: Utc.timestamp_nanos(10_000_000_000), + time_of_last_write: Utc.timestamp_nanos(10_000_000_000), time_closed: None, }, ChunkSummary { @@ -163,8 +167,8 @@ mod tests { object_store_bytes: 0, row_count: 22, time_of_last_access: Some(Utc.timestamp_nanos(754_000_000_000)), - time_of_first_write: None, - time_of_last_write: Some(Utc.timestamp_nanos(80_000_000_000)), + time_of_first_write: Utc.timestamp_nanos(80_000_000_000), + time_of_last_write: Utc.timestamp_nanos(80_000_000_000), time_closed: None, }, ChunkSummary { @@ -177,8 +181,8 @@ mod tests { object_store_bytes: 5678, row_count: 33, time_of_last_access: Some(Utc.timestamp_nanos(5_000_000_000)), - time_of_first_write: Some(Utc.timestamp_nanos(100_000_000_000)), - time_of_last_write: Some(Utc.timestamp_nanos(200_000_000_000)), + time_of_first_write: Utc.timestamp_nanos(100_000_000_000), + time_of_last_write: Utc.timestamp_nanos(200_000_000_000), time_closed: None, }, ]; @@ -187,8 +191,8 @@ mod tests { "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", "| id | partition_key | table_name | storage | lifecycle_action | memory_bytes | object_store_bytes | row_count | time_of_last_access | time_of_first_write | time_of_last_write | time_closed |", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", - "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | | |", - "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | | 1970-01-01T00:01:20Z | |", + "| 0 | p1 | table1 | OpenMutableBuffer | | 23754 | | 11 | | 1970-01-01T00:00:10Z | 1970-01-01T00:00:10Z | |", + "| 1 | p1 | table1 | OpenMutableBuffer | Persisting to Object Storage | 23455 | | 22 | 1970-01-01T00:12:34Z | 1970-01-01T00:01:20Z | 1970-01-01T00:01:20Z | |", "| 2 | p1 | table1 | ObjectStoreOnly | | 1234 | 5678 | 33 | 1970-01-01T00:00:05Z | 1970-01-01T00:01:40Z | 1970-01-01T00:03:20Z | |", "+----+---------------+------------+-------------------+------------------------------+--------------+--------------------+-----------+----------------------+----------------------+----------------------+-------------+", ]; diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs index f4db734bf2..4537d7f2e8 100644 --- a/server/src/db/system_tables/columns.rs +++ b/server/src/db/system_tables/columns.rs @@ -218,6 +218,7 @@ fn assemble_chunk_columns( mod tests { use super::*; use arrow_util::assert_batches_eq; + use chrono::{TimeZone, Utc}; use data_types::{ chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, @@ -317,8 +318,8 @@ mod tests { object_store_bytes: 0, row_count: 11, time_of_last_access: None, - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, }, columns: vec![ @@ -353,8 +354,8 @@ mod tests { object_store_bytes: 0, row_count: 11, time_of_last_access: None, - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, }, columns: vec![ChunkColumnSummary { @@ -383,8 +384,8 @@ mod tests { object_store_bytes: 0, row_count: 11, time_of_last_access: None, - time_of_first_write: None, - time_of_last_write: None, + time_of_first_write: Utc.timestamp_nanos(1), + time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, }, columns: vec![ChunkColumnSummary {