feat: Record first/last write times for RUB chunks

pull/24376/head
Carol (Nichols || Goulding) 2021-07-14 13:34:21 -04:00
parent 28fc01ecee
commit 8d1d877196
6 changed files with 52 additions and 18 deletions

1
Cargo.lock generated
View File

@ -3269,6 +3269,7 @@ version = "0.1.0"
dependencies = [
"arrow",
"arrow_util",
"chrono",
"criterion",
"croaring",
"data_types",

View File

@ -13,6 +13,7 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "5.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" }
chrono = "0.4"
croaring = "0.5"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }

View File

@ -5,7 +5,7 @@ use crate::{
table::{self, Table},
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummaryAndTimes};
use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection};
use metrics::{Gauge, KeyValue};
use observability_deps::tracing::info;
@ -193,7 +193,7 @@ impl Chunk {
///
/// TODO(edd): consider deprecating or changing to return information about
/// the physical layout of the data in the chunk.
pub fn table_summary(&self) -> TableSummary {
pub fn table_summary(&self) -> TableSummaryAndTimes {
self.table.table_summary()
}

View File

@ -5,7 +5,8 @@ use crate::{
value::{OwnedValue, Scalar, Value},
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummaryAndTimes};
use internal_types::selection::Selection;
use parking_lot::RwLock;
use snafu::{ensure, Snafu};
@ -189,7 +190,7 @@ impl Table {
}
/// Return a summary of all columns in this table
pub fn table_summary(&self) -> TableSummary {
pub fn table_summary(&self) -> TableSummaryAndTimes {
self.table_data.read().meta.to_summary(&self.name)
}
@ -557,15 +558,27 @@ pub struct MetaData {
// The names of the columns for this table in the order they appear.
column_names: Vec<String>,
/// Time at which the first data was written into this table. Note
/// this is not the same as the timestamps on the data itself
time_of_first_write: DateTime<Utc>,
/// Most recent time at which data write was initiated into this
/// chunk. Note this is not the same as the timestamps on the data
/// itself
time_of_last_write: DateTime<Utc>,
}
impl MetaData {
pub fn new(rg: &row_group::RowGroup) -> Self {
let now = Utc::now();
Self {
rgs_size: rg.size(),
rows: rg.rows() as u64,
columns: rg.metadata().columns.clone(),
column_names: rg.metadata().column_names.clone(),
time_of_first_write: now,
time_of_last_write: now,
}
}
@ -585,6 +598,7 @@ impl MetaData {
/// Create a new `MetaData` by consuming `this` and incorporating `other`.
pub fn update_with(mut this: Self, rg: &row_group::RowGroup) -> Self {
let now = Utc::now();
let other_meta = rg.metadata();
// first non-empty row group added to the table.
@ -593,6 +607,8 @@ impl MetaData {
this.rows = rg.rows() as u64;
this.columns = other_meta.columns.clone();
this.column_names = other_meta.column_names.clone();
this.time_of_first_write = now;
this.time_of_last_write = now;
return this;
}
@ -603,9 +619,10 @@ impl MetaData {
// existing row groups in the table.
assert_eq!(&this.columns, &other_meta.columns);
// update size, rows, column ranges, time range
// update size, rows, last write, column ranges, time range
this.rgs_size += rg.size();
this.rows += rg.rows() as u64;
this.time_of_last_write = now;
// Update the table schema using the incoming row group schema
for (column_name, column_meta) in &other_meta.columns {
@ -685,7 +702,7 @@ impl MetaData {
self.column_names.iter().map(|name| name.as_str()).collect()
}
pub fn to_summary(&self, table_name: impl Into<String>) -> TableSummary {
pub fn to_summary(&self, table_name: impl Into<String>) -> TableSummaryAndTimes {
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics};
let columns = self
.columns
@ -749,9 +766,11 @@ impl MetaData {
})
.collect();
TableSummary {
TableSummaryAndTimes {
name: table_name.into(),
columns,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
}
}
@ -1002,6 +1021,7 @@ mod test {
#[test]
fn meta_data_update_with() {
let before_creation = Utc::now();
let columns = vec![
(
"time".to_string(),
@ -1015,6 +1035,7 @@ mod test {
let rg = RowGroup::new(3, columns);
let mut meta = MetaData::new(&rg);
let after_creation = Utc::now();
assert_eq!(meta.rows, 3);
let meta_size = meta.rgs_size;
assert!(meta_size > 0);
@ -1025,6 +1046,10 @@ mod test {
OwnedValue::String("west".to_owned())
)
);
let first_write = meta.time_of_first_write;
assert_eq!(first_write, meta.time_of_last_write);
assert!(before_creation < first_write);
assert!(first_write < after_creation);
let columns = vec![
("time".to_string(), ColumnType::create_time(&[10, 400])),
@ -1036,6 +1061,7 @@ mod test {
let rg = RowGroup::new(2, columns);
meta = MetaData::update_with(meta, &rg);
let after_update = Utc::now();
assert_eq!(meta.rows, 5);
assert!(meta.rgs_size > meta_size);
assert_eq!(
@ -1045,6 +1071,10 @@ mod test {
OwnedValue::String("west".to_owned())
)
);
assert_ne!(meta.time_of_first_write, meta.time_of_last_write);
assert_eq!(meta.time_of_first_write, first_write);
assert!(after_creation < meta.time_of_last_write);
assert!(meta.time_of_last_write < after_update);
}
#[test]

View File

@ -1833,7 +1833,7 @@ mod tests {
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
let expected_read_buffer_size = 1613;
let expected_read_buffer_size = 1637;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
@ -2050,7 +2050,7 @@ mod tests {
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1613).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1637).unwrap();
// drop, the chunk from the read buffer
db.drop_chunk("cpu", partition_key, mb_chunk.id())
@ -2181,7 +2181,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(3191.0)
.sample_sum_eq(3215.0)
.unwrap();
let rb = collect_read_filter(&rb_chunk).await;
@ -2291,7 +2291,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2260.0)
.sample_sum_eq(2284.0)
.unwrap();
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
@ -2411,7 +2411,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2260.0)
.sample_sum_eq(2284.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -2868,8 +2868,8 @@ mod tests {
2,
ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
3236,
1479,
3260, // size of RB and OS chunks
1479, // size of parquet file
2,
),
ChunkSummary::new_without_timestamps(
@ -2903,7 +2903,7 @@ mod tests {
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2398 + 87);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2434);
assert_eq!(db.catalog.metrics().memory().object_store(), 826);
}

View File

@ -298,10 +298,12 @@ impl CatalogChunk {
metrics: ChunkMetrics,
) -> Self {
let summary = chunk.table_summary();
let first_write = summary.time_of_first_write;
let last_write = summary.time_of_last_write;
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(summary),
table_summary: Arc::new(summary.into()),
schema,
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
@ -317,8 +319,8 @@ impl CatalogChunk {
lifecycle_action: None,
metrics,
access_recorder: Default::default(),
time_of_first_write: None,
time_of_last_write: None,
time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write),
time_closed: None,
};
chunk.update_metrics();