feat: Record time of first/last write on MBChunk
parent
22495dd355
commit
b9a6a11b34
|
@ -3,6 +3,7 @@ use crate::{
|
|||
column::{self, Column},
|
||||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
|
||||
use entry::{Sequence, TableBatch};
|
||||
use hashbrown::HashMap;
|
||||
|
@ -86,6 +87,15 @@ pub struct MBChunk {
|
|||
/// Note: This is a mutex to allow mutation within
|
||||
/// `Chunk::snapshot()` which only takes an immutable borrow
|
||||
snapshot: Mutex<Option<Arc<ChunkSnapshot>>>,
|
||||
|
||||
/// Time at which the first data was written into this chunk. Note
|
||||
/// this is not the same as the timestamps on the data itself
|
||||
time_of_first_write: DateTime<Utc>,
|
||||
|
||||
/// Most recent time at which data write was initiated into this
|
||||
/// chunk. Note this is not the same as the timestamps on the data
|
||||
/// itself
|
||||
time_of_last_write: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl MBChunk {
|
||||
|
@ -98,11 +108,15 @@ impl MBChunk {
|
|||
) -> Result<Self> {
|
||||
let table_name = Arc::from(batch.name());
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
let mut chunk = Self {
|
||||
table_name,
|
||||
columns: Default::default(),
|
||||
metrics,
|
||||
snapshot: Mutex::new(None),
|
||||
time_of_first_write: now,
|
||||
time_of_last_write: now,
|
||||
};
|
||||
|
||||
let columns = batch.columns();
|
||||
|
@ -138,6 +152,7 @@ impl MBChunk {
|
|||
.expect("concurrent readers/writers to MBChunk") = None;
|
||||
|
||||
self.metrics.memory_bytes.set(self.size());
|
||||
self.time_of_last_write = Utc::now();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -443,11 +458,29 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn writes_table_3_batches() {
|
||||
let before_creation = Utc::now();
|
||||
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
|
||||
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
let after_creation = Utc::now();
|
||||
|
||||
// There was only one write so far, so first and last write times should be equal
|
||||
let first_write = chunk.time_of_first_write;
|
||||
assert_eq!(first_write, chunk.time_of_last_write);
|
||||
|
||||
assert!(before_creation < first_write);
|
||||
assert!(first_write < after_creation);
|
||||
|
||||
let lp = vec!["cpu,host=c val=11 1"].join("\n");
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
let after_write = Utc::now();
|
||||
|
||||
// Now the first and last times should be different
|
||||
assert_ne!(chunk.time_of_first_write, chunk.time_of_last_write);
|
||||
// The first write time should not have been updated
|
||||
assert_eq!(chunk.time_of_first_write, first_write);
|
||||
// The last write time should have been updated
|
||||
assert!(after_creation < chunk.time_of_last_write);
|
||||
assert!(chunk.time_of_last_write < after_write);
|
||||
|
||||
let lp = vec!["cpu,host=a val=14 2"].join("\n");
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
|
@ -469,6 +502,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_summary() {
|
||||
let before_write = Utc::now();
|
||||
|
||||
let lp = r#"
|
||||
cpu,host=a val=23 1
|
||||
cpu,host=b,env=prod val=2 1
|
||||
|
@ -477,7 +512,16 @@ mod tests {
|
|||
"#;
|
||||
let chunk = write_lp_to_new_chunk(&lp).unwrap();
|
||||
|
||||
let after_write = Utc::now();
|
||||
|
||||
// There was only one write, so first and last write times should be equal
|
||||
assert_eq!(chunk.time_of_first_write, chunk.time_of_last_write);
|
||||
|
||||
assert!(before_write < chunk.time_of_first_write);
|
||||
assert!(chunk.time_of_first_write < after_write);
|
||||
|
||||
let summary = chunk.table_summary();
|
||||
|
||||
assert_eq!(
|
||||
summary,
|
||||
TableSummary {
|
||||
|
|
Loading…
Reference in New Issue