refactor: Remove first/last write times from MUB chunks

pull/24376/head
Carol (Nichols || Goulding) 2021-07-26 16:29:05 -04:00
parent 0f5398c4b9
commit 4689b5e4e5
7 changed files with 26 additions and 104 deletions

View File

@ -3,8 +3,7 @@ use crate::{
column::{self, Column},
};
use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummaryAndTimes};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
use entry::TableBatch;
use hashbrown::HashMap;
use internal_types::{
@ -84,25 +83,12 @@ pub struct MBChunk {
/// Note: This is a mutex to allow mutation within
/// `Chunk::snapshot()` which only takes an immutable borrow
snapshot: Mutex<Option<Arc<ChunkSnapshot>>>,
/// Time at which the first data was written into this chunk. Note
/// this is not the same as the timestamps on the data itself
time_of_first_write: DateTime<Utc>,
/// Most recent time at which data write was initiated into this
/// chunk. Note this is not the same as the timestamps on the data
/// itself
time_of_last_write: DateTime<Utc>,
}
impl MBChunk {
/// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks
/// shouldn't exist without some data.
pub fn new(
metrics: ChunkMetrics,
batch: TableBatch<'_>,
time_of_write: DateTime<Utc>,
) -> Result<Self> {
pub fn new(metrics: ChunkMetrics, batch: TableBatch<'_>) -> Result<Self> {
let table_name = Arc::from(batch.name());
let mut chunk = Self {
@ -110,8 +96,6 @@ impl MBChunk {
columns: Default::default(),
metrics,
snapshot: Mutex::new(None),
time_of_first_write: time_of_write,
time_of_last_write: time_of_write,
};
let columns = batch.columns();
@ -123,11 +107,7 @@ impl MBChunk {
/// Write the contents of a [`TableBatch`] into this Chunk.
///
/// Panics if the batch specifies a different name for the table in this Chunk
pub fn write_table_batch(
&mut self,
batch: TableBatch<'_>,
time_of_write: DateTime<Utc>,
) -> Result<()> {
pub fn write_table_batch(&mut self, batch: TableBatch<'_>) -> Result<()> {
let table_name = batch.name();
assert_eq!(
table_name,
@ -143,10 +123,6 @@ impl MBChunk {
.try_lock()
.expect("concurrent readers/writers to MBChunk") = None;
// DateTime<Utc> is not necessarily monotonic
self.time_of_first_write = self.time_of_first_write.min(time_of_write);
self.time_of_last_write = self.time_of_last_write.max(time_of_write);
Ok(())
}
@ -227,7 +203,7 @@ impl MBChunk {
}
/// Returns a table summary for this chunk
pub fn table_summary(&self) -> TableSummaryAndTimes {
pub fn table_summary(&self) -> TableSummary {
let mut columns: Vec<_> = self
.columns
.iter()
@ -245,11 +221,9 @@ impl MBChunk {
columns.sort_by(|a, b| a.name.cmp(&b.name));
TableSummaryAndTimes {
TableSummary {
name: self.table_name.to_string(),
columns,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
}
}
@ -355,7 +329,6 @@ pub mod test_helpers {
/// server id of 1.
pub fn write_lp_to_chunk(lp: &str, chunk: &mut MBChunk) -> Result<()> {
let entry = lp_to_entry(lp);
let time_of_write = Utc::now();
for w in entry.partition_writes().unwrap() {
let table_batches = w.table_batches();
@ -370,7 +343,7 @@ pub mod test_helpers {
);
for batch in table_batches {
chunk.write_table_batch(batch, time_of_write)?;
chunk.write_table_batch(batch)?;
}
}
@ -379,7 +352,6 @@ pub mod test_helpers {
pub fn write_lp_to_new_chunk(lp: &str) -> Result<MBChunk> {
let entry = lp_to_entry(lp);
let time_of_write = Utc::now();
let mut chunk: Option<MBChunk> = None;
for w in entry.partition_writes().unwrap() {
@ -396,13 +368,9 @@ pub mod test_helpers {
for batch in table_batches {
match chunk {
Some(ref mut c) => c.write_table_batch(batch, time_of_write)?,
Some(ref mut c) => c.write_table_batch(batch)?,
None => {
chunk = Some(MBChunk::new(
ChunkMetrics::new_unregistered(),
batch,
time_of_write,
)?);
chunk = Some(MBChunk::new(ChunkMetrics::new_unregistered(), batch)?);
}
}
}
@ -446,29 +414,11 @@ mod tests {
#[test]
fn writes_table_3_batches() {
let before_creation = Utc::now();
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
let after_creation = Utc::now();
// There was only one write so far, so first and last write times should be equal
let first_write = chunk.time_of_first_write;
assert_eq!(first_write, chunk.time_of_last_write);
assert!(before_creation < first_write);
assert!(first_write < after_creation);
let lp = vec!["cpu,host=c val=11 1"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let after_write = Utc::now();
// Now the first and last times should be different
assert_ne!(chunk.time_of_first_write, chunk.time_of_last_write);
// The first write time should not have been updated
assert_eq!(chunk.time_of_first_write, first_write);
// The last write time should have been updated
assert!(after_creation < chunk.time_of_last_write);
assert!(chunk.time_of_last_write < after_write);
let lp = vec!["cpu,host=a val=14 2"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
@ -489,8 +439,6 @@ mod tests {
#[test]
fn test_summary() {
let before_write = Utc::now();
let lp = r#"
cpu,host=a val=23 1
cpu,host=b,env=prod val=2 1
@ -499,14 +447,6 @@ mod tests {
"#;
let chunk = write_lp_to_new_chunk(&lp).unwrap();
let after_write = Utc::now();
// There was only one write, so first and last write times should be equal
assert_eq!(chunk.time_of_first_write, chunk.time_of_last_write);
assert!(before_write < chunk.time_of_first_write);
assert!(chunk.time_of_first_write < after_write);
let summary = chunk.table_summary();
assert_eq!(summary.name, "cpu");
let expected_column_summaries = vec![

View File

@ -2,7 +2,7 @@ use super::MBChunk;
use arrow::record_batch::RecordBatch;
use data_types::{
error::ErrorLogger,
partition_metadata::{Statistics, TableSummaryAndTimes},
partition_metadata::{Statistics, TableSummary},
timestamp::TimestampRange,
};
use internal_types::{
@ -31,7 +31,7 @@ pub struct ChunkSnapshot {
schema: Arc<Schema>,
batch: RecordBatch,
table_name: Arc<str>,
summary: TableSummaryAndTimes,
summary: TableSummary,
}
impl ChunkSnapshot {
@ -100,7 +100,7 @@ impl ChunkSnapshot {
}
/// Returns a table summary for this chunk
pub fn table_summary(&self) -> &TableSummaryAndTimes {
pub fn table_summary(&self) -> &TableSummary {
&self.summary
}

View File

@ -1111,9 +1111,8 @@ impl Db {
let mb_chunk =
chunk.mutable_buffer().expect("cannot mutate open chunk");
if let Err(e) = mb_chunk
.write_table_batch(table_batch, time_of_write)
.context(WriteEntry {
if let Err(e) =
mb_chunk.write_table_batch(table_batch).context(WriteEntry {
partition_key,
chunk_id,
})
@ -1882,7 +1881,7 @@ mod tests {
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
assert_metric("catalog_loaded_rows", "object_store", 0.0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1319)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1295)
.unwrap();
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
@ -3194,7 +3193,7 @@ mod tests {
id: 0,
storage: ChunkStorage::ClosedMutableBuffer,
lifecycle_action,
memory_bytes: 2510,
memory_bytes: 2486,
object_store_bytes: 0, // no OS chunks
row_count: 1,
time_of_last_access: None,
@ -3230,7 +3229,7 @@ mod tests {
);
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2510 + 87);
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2434);
assert_eq!(db.catalog.metrics().memory().object_store(), 898);
}

View File

@ -277,15 +277,10 @@ impl CatalogChunk {
time_of_write: DateTime<Utc>,
metrics: ChunkMetrics,
) -> Result<Self> {
let chunk =
MBChunk::new(mb_chunk_metrics, batch, time_of_write).context(CreateOpenChunk)?;
let chunk = MBChunk::new(mb_chunk_metrics, batch).context(CreateOpenChunk)?;
assert_eq!(chunk.table_name(), &addr.table_name);
let summary = chunk.table_summary();
let time_of_first_write = summary.time_of_first_write;
let time_of_last_write = summary.time_of_last_write;
let stage = ChunkStage::Open { mb_chunk: chunk };
metrics
@ -298,8 +293,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write,
time_of_last_write,
time_of_first_write: time_of_write,
time_of_last_write: time_of_write,
time_closed: None,
};
chunk.update_metrics();
@ -596,7 +591,7 @@ impl CatalogChunk {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
// The stats for open chunks change so can't be cached
Arc::new(mb_chunk.table_summary().into())
Arc::new(mb_chunk.table_summary())
}
ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary),
ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary),
@ -665,7 +660,7 @@ impl CatalogChunk {
// Cache table summary + schema
let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary().into()),
table_summary: Arc::new(mb_chunk.table_summary()),
schema: s.full_schema(),
};

View File

@ -114,7 +114,7 @@ impl DbChunk {
chunk: Arc::clone(&snapshot),
};
let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary().into()),
table_summary: Arc::new(mb_chunk.table_summary()),
schema: snapshot.full_schema(),
};
(state, Arc::new(meta))

View File

@ -17,23 +17,17 @@ fn chunk(count: usize) -> MBChunk {
let mut lp = String::new();
gz.read_to_string(&mut lp).unwrap();
let time_of_write = chrono::Utc::now();
for _ in 0..count {
for entry in lp_to_entries(&lp, &hour_partitioner()) {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
match chunk {
Some(ref mut c) => {
c.write_table_batch(batch, time_of_write).unwrap();
c.write_table_batch(batch).unwrap();
}
None => {
chunk = Some(
MBChunk::new(
ChunkMetrics::new_unregistered(),
batch,
time_of_write,
)
.unwrap(),
MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(),
);
}
}

View File

@ -11,23 +11,17 @@ use std::io::Read;
fn write_chunk(count: usize, entries: &[Entry]) {
let mut chunk: Option<MBChunk> = None;
let time_of_write = chrono::Utc::now();
for _ in 0..count {
for entry in entries {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
match chunk {
Some(ref mut c) => {
c.write_table_batch(batch, time_of_write).unwrap();
c.write_table_batch(batch).unwrap();
}
None => {
chunk = Some(
MBChunk::new(
ChunkMetrics::new_unregistered(),
batch,
time_of_write,
)
.unwrap(),
MBChunk::new(ChunkMetrics::new_unregistered(), batch).unwrap(),
);
}
}