diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 1bec696cc7..243a642913 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -3,6 +3,7 @@ use crate::{ column::{self, Column}, }; use arrow::record_batch::RecordBatch; +use chrono::{DateTime, Utc}; use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary}; use entry::{Sequence, TableBatch}; use hashbrown::HashMap; @@ -86,6 +87,15 @@ pub struct MBChunk { /// Note: This is a mutex to allow mutation within /// `Chunk::snapshot()` which only takes an immutable borrow snapshot: Mutex>>, + + /// Time at which the first data was written into this chunk. Note + /// this is not the same as the timestamps on the data itself + time_of_first_write: DateTime, + + /// Most recent time at which data write was initiated into this + /// chunk. Note this is not the same as the timestamps on the data + /// itself + time_of_last_write: DateTime, } impl MBChunk { @@ -98,11 +108,15 @@ impl MBChunk { ) -> Result { let table_name = Arc::from(batch.name()); + let now = Utc::now(); + let mut chunk = Self { table_name, columns: Default::default(), metrics, snapshot: Mutex::new(None), + time_of_first_write: now, + time_of_last_write: now, }; let columns = batch.columns(); @@ -138,6 +152,7 @@ impl MBChunk { .expect("concurrent readers/writers to MBChunk") = None; self.metrics.memory_bytes.set(self.size()); + self.time_of_last_write = Utc::now(); Ok(()) } @@ -443,11 +458,29 @@ mod tests { #[test] fn writes_table_3_batches() { + let before_creation = Utc::now(); let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); + let after_creation = Utc::now(); + + // There was only one write so far, so first and last write times should be equal + let first_write = chunk.time_of_first_write; + assert_eq!(first_write, chunk.time_of_last_write); + + assert!(before_creation < first_write); + assert!(first_write < after_creation); let lp = vec!["cpu,host=c val=11 1"].join("\n"); write_lp_to_chunk(&lp, &mut chunk).unwrap(); + let after_write = Utc::now(); + + // Now the first and last times should be different + assert_ne!(chunk.time_of_first_write, chunk.time_of_last_write); + // The first write time should not have been updated + assert_eq!(chunk.time_of_first_write, first_write); + // The last write time should have been updated + assert!(after_creation < chunk.time_of_last_write); + assert!(chunk.time_of_last_write < after_write); let lp = vec!["cpu,host=a val=14 2"].join("\n"); write_lp_to_chunk(&lp, &mut chunk).unwrap(); @@ -469,6 +502,8 @@ mod tests { #[test] fn test_summary() { + let before_write = Utc::now(); + let lp = r#" cpu,host=a val=23 1 cpu,host=b,env=prod val=2 1 @@ -477,7 +512,16 @@ mod tests { "#; let chunk = write_lp_to_new_chunk(&lp).unwrap(); + let after_write = Utc::now(); + + // There was only one write, so first and last write times should be equal + assert_eq!(chunk.time_of_first_write, chunk.time_of_last_write); + + assert!(before_write < chunk.time_of_first_write); + assert!(chunk.time_of_first_write < after_write); + let summary = chunk.table_summary(); + assert_eq!( summary, TableSummary {