From 4689b5e4e517efc471ec2df217cec787db586811 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 26 Jul 2021 16:29:05 -0400 Subject: [PATCH] refactor: Remove first/last write times from MUB chunks --- mutable_buffer/src/chunk.rs | 76 +++------------------------ mutable_buffer/src/chunk/snapshot.rs | 6 +-- server/src/db.rs | 11 ++-- server/src/db/catalog/chunk.rs | 15 ++---- server/src/db/chunk.rs | 2 +- server_benchmarks/benches/snapshot.rs | 10 +--- server_benchmarks/benches/write.rs | 10 +--- 7 files changed, 26 insertions(+), 104 deletions(-) diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index ab5217916d..691cf4c408 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -3,8 +3,7 @@ use crate::{ column::{self, Column}, }; use arrow::record_batch::RecordBatch; -use chrono::{DateTime, Utc}; -use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummaryAndTimes}; +use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary}; use entry::TableBatch; use hashbrown::HashMap; use internal_types::{ @@ -84,25 +83,12 @@ pub struct MBChunk { /// Note: This is a mutex to allow mutation within /// `Chunk::snapshot()` which only takes an immutable borrow snapshot: Mutex>>, - - /// Time at which the first data was written into this chunk. Note - /// this is not the same as the timestamps on the data itself - time_of_first_write: DateTime, - - /// Most recent time at which data write was initiated into this - /// chunk. Note this is not the same as the timestamps on the data - /// itself - time_of_last_write: DateTime, } impl MBChunk { /// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks /// shouldn't exist without some data. - pub fn new( - metrics: ChunkMetrics, - batch: TableBatch<'_>, - time_of_write: DateTime, - ) -> Result { + pub fn new(metrics: ChunkMetrics, batch: TableBatch<'_>) -> Result { let table_name = Arc::from(batch.name()); let mut chunk = Self { @@ -110,8 +96,6 @@ impl MBChunk { columns: Default::default(), metrics, snapshot: Mutex::new(None), - time_of_first_write: time_of_write, - time_of_last_write: time_of_write, }; let columns = batch.columns(); @@ -123,11 +107,7 @@ impl MBChunk { /// Write the contents of a [`TableBatch`] into this Chunk. /// /// Panics if the batch specifies a different name for the table in this Chunk - pub fn write_table_batch( - &mut self, - batch: TableBatch<'_>, - time_of_write: DateTime, - ) -> Result<()> { + pub fn write_table_batch(&mut self, batch: TableBatch<'_>) -> Result<()> { let table_name = batch.name(); assert_eq!( table_name, @@ -143,10 +123,6 @@ impl MBChunk { .try_lock() .expect("concurrent readers/writers to MBChunk") = None; - // DateTime is not necessarily monotonic - self.time_of_first_write = self.time_of_first_write.min(time_of_write); - self.time_of_last_write = self.time_of_last_write.max(time_of_write); - Ok(()) } @@ -227,7 +203,7 @@ impl MBChunk { } /// Returns a table summary for this chunk - pub fn table_summary(&self) -> TableSummaryAndTimes { + pub fn table_summary(&self) -> TableSummary { let mut columns: Vec<_> = self .columns .iter() @@ -245,11 +221,9 @@ impl MBChunk { columns.sort_by(|a, b| a.name.cmp(&b.name)); - TableSummaryAndTimes { + TableSummary { name: self.table_name.to_string(), columns, - time_of_first_write: self.time_of_first_write, - time_of_last_write: self.time_of_last_write, } } @@ -355,7 +329,6 @@ pub mod test_helpers { /// server id of 1. pub fn write_lp_to_chunk(lp: &str, chunk: &mut MBChunk) -> Result<()> { let entry = lp_to_entry(lp); - let time_of_write = Utc::now(); for w in entry.partition_writes().unwrap() { let table_batches = w.table_batches(); @@ -370,7 +343,7 @@ pub mod test_helpers { ); for batch in table_batches { - chunk.write_table_batch(batch, time_of_write)?; + chunk.write_table_batch(batch)?; } } @@ -379,7 +352,6 @@ pub mod test_helpers { pub fn write_lp_to_new_chunk(lp: &str) -> Result { let entry = lp_to_entry(lp); - let time_of_write = Utc::now(); let mut chunk: Option = None; for w in entry.partition_writes().unwrap() { @@ -396,13 +368,9 @@ pub mod test_helpers { for batch in table_batches { match chunk { - Some(ref mut c) => c.write_table_batch(batch, time_of_write)?, + Some(ref mut c) => c.write_table_batch(batch)?, None => { - chunk = Some(MBChunk::new( - ChunkMetrics::new_unregistered(), - batch, - time_of_write, - )?); + chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch)?); } } } @@ -446,29 +414,11 @@ mod tests { #[test] fn writes_table_3_batches() { - let before_creation = Utc::now(); let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); - let after_creation = Utc::now(); - - // There was only one write so far, so first and last write times should be equal - let first_write = chunk.time_of_first_write; - assert_eq!(first_write, chunk.time_of_last_write); - - assert!(before_creation < first_write); - assert!(first_write < after_creation); let lp = vec!["cpu,host=c val=11 1"].join("\n"); write_lp_to_chunk(&lp, &mut chunk).unwrap(); - let after_write = Utc::now(); - - // Now the first and last times should be different - assert_ne!(chunk.time_of_first_write, chunk.time_of_last_write); - // The first write time should not have been updated - assert_eq!(chunk.time_of_first_write, first_write); - // The last write time should have been updated - assert!(after_creation < chunk.time_of_last_write); - assert!(chunk.time_of_last_write < after_write); let lp = vec!["cpu,host=a val=14 2"].join("\n"); write_lp_to_chunk(&lp, &mut chunk).unwrap(); @@ -489,8 +439,6 @@ mod tests { #[test] fn test_summary() { - let before_write = Utc::now(); - let lp = r#" cpu,host=a val=23 1 cpu,host=b,env=prod val=2 1 @@ -499,14 +447,6 @@ mod tests { "#; let chunk = write_lp_to_new_chunk(&lp).unwrap(); - let after_write = Utc::now(); - - // There was only one write, so first and last write times should be equal - assert_eq!(chunk.time_of_first_write, chunk.time_of_last_write); - - assert!(before_write < chunk.time_of_first_write); - assert!(chunk.time_of_first_write < after_write); - let summary = chunk.table_summary(); assert_eq!(summary.name, "cpu"); let expected_column_summaries = vec![ diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index 4640352889..4ec4dfff6d 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -2,7 +2,7 @@ use super::MBChunk; use arrow::record_batch::RecordBatch; use data_types::{ error::ErrorLogger, - partition_metadata::{Statistics, TableSummaryAndTimes}, + partition_metadata::{Statistics, TableSummary}, timestamp::TimestampRange, }; use internal_types::{ @@ -31,7 +31,7 @@ pub struct ChunkSnapshot { schema: Arc, batch: RecordBatch, table_name: Arc, - summary: TableSummaryAndTimes, + summary: TableSummary, } impl ChunkSnapshot { @@ -100,7 +100,7 @@ impl ChunkSnapshot { } /// Returns a table summary for this chunk - pub fn table_summary(&self) -> &TableSummaryAndTimes { + pub fn table_summary(&self) -> &TableSummary { &self.summary } diff --git a/server/src/db.rs b/server/src/db.rs index db5e156a92..de27c318a8 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1111,9 +1111,8 @@ impl Db { let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); - if let Err(e) = mb_chunk - .write_table_batch(table_batch, time_of_write) - .context(WriteEntry { + if let Err(e) = + mb_chunk.write_table_batch(table_batch).context(WriteEntry { partition_key, chunk_id, }) @@ -1882,7 +1881,7 @@ mod tests { assert_metric("catalog_loaded_rows", "read_buffer", 0.0); assert_metric("catalog_loaded_rows", "object_store", 0.0); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1319) + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1295) .unwrap(); db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) @@ -3194,7 +3193,7 @@ mod tests { id: 0, storage: ChunkStorage::ClosedMutableBuffer, lifecycle_action, - memory_bytes: 2510, + memory_bytes: 2486, object_store_bytes: 0, // no OS chunks row_count: 1, time_of_last_access: None, @@ -3230,7 +3229,7 @@ mod tests { ); } - assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2510 + 87); + assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87); assert_eq!(db.catalog.metrics().memory().read_buffer(), 2434); assert_eq!(db.catalog.metrics().memory().object_store(), 898); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index afbc60cef4..613995476f 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -277,15 +277,10 @@ impl CatalogChunk { time_of_write: DateTime, metrics: ChunkMetrics, ) -> Result { - let chunk = - MBChunk::new(mb_chunk_metrics, batch, time_of_write).context(CreateOpenChunk)?; + let chunk = MBChunk::new(mb_chunk_metrics, batch).context(CreateOpenChunk)?; assert_eq!(chunk.table_name(), &addr.table_name); - let summary = chunk.table_summary(); - let time_of_first_write = summary.time_of_first_write; - let time_of_last_write = summary.time_of_last_write; - let stage = ChunkStage::Open { mb_chunk: chunk }; metrics @@ -298,8 +293,8 @@ impl CatalogChunk { lifecycle_action: None, metrics, access_recorder: Default::default(), - time_of_first_write, - time_of_last_write, + time_of_first_write: time_of_write, + time_of_last_write: time_of_write, time_closed: None, }; chunk.update_metrics(); @@ -596,7 +591,7 @@ impl CatalogChunk { match &self.stage { ChunkStage::Open { mb_chunk, .. } => { // The stats for open chunks change so can't be cached - Arc::new(mb_chunk.table_summary().into()) + Arc::new(mb_chunk.table_summary()) } ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary), ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary), @@ -665,7 +660,7 @@ impl CatalogChunk { // Cache table summary + schema let metadata = ChunkMetadata { - table_summary: Arc::new(mb_chunk.table_summary().into()), + table_summary: Arc::new(mb_chunk.table_summary()), schema: s.full_schema(), }; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 1ec2d62684..85f884d049 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -114,7 +114,7 @@ impl DbChunk { chunk: Arc::clone(&snapshot), }; let meta = ChunkMetadata { - table_summary: Arc::new(mb_chunk.table_summary().into()), + table_summary: Arc::new(mb_chunk.table_summary()), schema: snapshot.full_schema(), }; (state, Arc::new(meta)) diff --git a/server_benchmarks/benches/snapshot.rs b/server_benchmarks/benches/snapshot.rs index 9725440ce7..3c63ebd70b 100644 --- a/server_benchmarks/benches/snapshot.rs +++ b/server_benchmarks/benches/snapshot.rs @@ -17,23 +17,17 @@ fn chunk(count: usize) -> MBChunk { let mut lp = String::new(); gz.read_to_string(&mut lp).unwrap(); - let time_of_write = chrono::Utc::now(); for _ in 0..count { for entry in lp_to_entries(&lp, &hour_partitioner()) { for write in entry.partition_writes().iter().flatten() { for batch in write.table_batches() { match chunk { Some(ref mut c) => { - c.write_table_batch(batch, time_of_write).unwrap(); + c.write_table_batch(batch).unwrap(); } None => { chunk = Some( - MBChunk::new( - ChunkMetrics::new_unregistered(), - batch, - time_of_write, - ) - .unwrap(), + MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(), ); } } diff --git a/server_benchmarks/benches/write.rs b/server_benchmarks/benches/write.rs index dcedd36430..51ec6b22f4 100644 --- a/server_benchmarks/benches/write.rs +++ b/server_benchmarks/benches/write.rs @@ -11,23 +11,17 @@ use std::io::Read; fn write_chunk(count: usize, entries: &[Entry]) { let mut chunk: Option = None; - let time_of_write = chrono::Utc::now(); for _ in 0..count { for entry in entries { for write in entry.partition_writes().iter().flatten() { for batch in write.table_batches() { match chunk { Some(ref mut c) => { - c.write_table_batch(batch, time_of_write).unwrap(); + c.write_table_batch(batch).unwrap(); } None => { chunk = Some( - MBChunk::new( - ChunkMetrics::new_unregistered(), - batch, - time_of_write, - ) - .unwrap(), + MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(), ); } }