refactor: Move first/last write times up to read buffer Chunk rather than MetaData

pull/24376/head
Carol (Nichols || Goulding) 2021-07-22 11:14:54 -04:00
parent 14cb2a6bef
commit 05782eb980
2 changed files with 60 additions and 105 deletions

View File

@ -6,7 +6,10 @@ use crate::{
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummaryAndTimes}; use data_types::{
chunk_metadata::ChunkColumnSummary,
partition_metadata::{TableSummary, TableSummaryAndTimes},
};
use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection}; use internal_types::{schema::builder::Error as SchemaError, schema::Schema, selection::Selection};
use metrics::{Gauge, KeyValue}; use metrics::{Gauge, KeyValue};
use observability_deps::tracing::info; use observability_deps::tracing::info;
@ -46,6 +49,15 @@ pub struct Chunk {
// The table associated with the chunk. // The table associated with the chunk.
pub(crate) table: Table, pub(crate) table: Table,
/// Time at which the first data was written into this table. Note
/// this is not the same as the timestamps on the data itself
time_of_first_write: DateTime<Utc>,
/// Most recent time at which data write was initiated into this
/// chunk. Note this is not the same as the timestamps on the data
/// itself
time_of_last_write: DateTime<Utc>,
} }
impl Chunk { impl Chunk {
@ -61,16 +73,16 @@ impl Chunk {
let row_group = record_batch_to_row_group_with_logging(&table_name, table_data); let row_group = record_batch_to_row_group_with_logging(&table_name, table_data);
let storage_statistics = row_group.column_storage_statistics(); let storage_statistics = row_group.column_storage_statistics();
let table = Table::with_row_group( let table = Table::with_row_group(table_name, row_group);
table_name,
row_group,
time_of_first_write,
time_of_last_write,
);
metrics.update_column_storage_statistics(&storage_statistics); metrics.update_column_storage_statistics(&storage_statistics);
Self { metrics, table } Self {
metrics,
table,
time_of_first_write,
time_of_last_write,
}
} }
// Only used in tests and benchmarks // Only used in tests and benchmarks
@ -82,7 +94,9 @@ impl Chunk {
let now = Utc::now(); let now = Utc::now();
Self { Self {
metrics, metrics,
table: Table::with_row_group(table_name, row_group, now, now), table: Table::with_row_group(table_name, row_group),
time_of_first_write: now,
time_of_last_write: now,
} }
} }
@ -128,6 +142,9 @@ impl Chunk {
self.table.add_row_group(row_group); self.table.add_row_group(row_group);
// update last write time
self.time_of_last_write = Utc::now();
// update column metrics associated with column storage // update column metrics associated with column storage
self.metrics self.metrics
.update_column_storage_statistics(&storage_statistics); .update_column_storage_statistics(&storage_statistics);
@ -204,7 +221,13 @@ impl Chunk {
/// TODO(edd): consider deprecating or changing to return information about /// TODO(edd): consider deprecating or changing to return information about
/// the physical layout of the data in the chunk. /// the physical layout of the data in the chunk.
pub fn table_summary(&self) -> TableSummaryAndTimes { pub fn table_summary(&self) -> TableSummaryAndTimes {
self.table.table_summary() let TableSummary { name, columns } = self.table.table_summary();
TableSummaryAndTimes {
name,
columns,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
}
} }
/// Returns a schema object for a `read_filter` operation using the provided /// Returns a schema object for a `read_filter` operation using the provided

View File

@ -5,8 +5,7 @@ use crate::{
value::{OwnedValue, Scalar, Value}, value::{OwnedValue, Scalar, Value},
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc}; use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummaryAndTimes};
use internal_types::selection::Selection; use internal_types::selection::Selection;
use parking_lot::RwLock; use parking_lot::RwLock;
use snafu::{ensure, Snafu}; use snafu::{ensure, Snafu};
@ -80,16 +79,11 @@ struct RowGroupData {
impl Table { impl Table {
/// Create a new table with the provided row_group. Creating an empty table is not possible. /// Create a new table with the provided row_group. Creating an empty table is not possible.
pub fn with_row_group( pub fn with_row_group(name: impl Into<String>, rg: RowGroup) -> Self {
name: impl Into<String>,
rg: RowGroup,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
) -> Self {
Self { Self {
name: name.into(), name: name.into(),
table_data: RwLock::new(RowGroupData { table_data: RwLock::new(RowGroupData {
meta: Arc::new(MetaData::new(&rg, time_of_first_write, time_of_last_write)), meta: Arc::new(MetaData::new(&rg)),
data: vec![Arc::new(rg)], data: vec![Arc::new(rg)],
}), }),
} }
@ -123,10 +117,7 @@ impl Table {
ensure!(row_groups.data.len() > 1, EmptyTableError); ensure!(row_groups.data.len() > 1, EmptyTableError);
row_groups.data.remove(position); // removes row group data row_groups.data.remove(position); // removes row group data
row_groups.meta = Arc::new(MetaData::from(( row_groups.meta = Arc::new(MetaData::from(row_groups.data.as_ref())); // rebuild meta
row_groups.data.as_ref(),
row_groups.meta.as_ref(),
))); // rebuild meta
Ok(()) Ok(())
} }
@ -198,7 +189,7 @@ impl Table {
} }
/// Return a summary of all columns in this table /// Return a summary of all columns in this table
pub fn table_summary(&self) -> TableSummaryAndTimes { pub fn table_summary(&self) -> TableSummary {
self.table_data.read().meta.to_summary(&self.name) self.table_data.read().meta.to_summary(&self.name)
} }
@ -566,30 +557,15 @@ pub struct MetaData {
// The names of the columns for this table in the order they appear. // The names of the columns for this table in the order they appear.
column_names: Vec<String>, column_names: Vec<String>,
/// Time at which the first data was written into this table. Note
/// this is not the same as the timestamps on the data itself
time_of_first_write: DateTime<Utc>,
/// Most recent time at which data write was initiated into this
/// chunk. Note this is not the same as the timestamps on the data
/// itself
time_of_last_write: DateTime<Utc>,
} }
impl MetaData { impl MetaData {
pub fn new( pub fn new(rg: &row_group::RowGroup) -> Self {
rg: &row_group::RowGroup,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
) -> Self {
Self { Self {
rgs_size: rg.size(), rgs_size: rg.size(),
rows: rg.rows() as u64, rows: rg.rows() as u64,
columns: rg.metadata().columns.clone(), columns: rg.metadata().columns.clone(),
column_names: rg.metadata().column_names.clone(), column_names: rg.metadata().column_names.clone(),
time_of_first_write,
time_of_last_write,
} }
} }
@ -609,7 +585,6 @@ impl MetaData {
/// Create a new `MetaData` by consuming `this` and incorporating `other`. /// Create a new `MetaData` by consuming `this` and incorporating `other`.
pub fn update_with(mut this: Self, rg: &row_group::RowGroup) -> Self { pub fn update_with(mut this: Self, rg: &row_group::RowGroup) -> Self {
let now = Utc::now();
let other_meta = rg.metadata(); let other_meta = rg.metadata();
// first non-empty row group added to the table. // first non-empty row group added to the table.
@ -618,8 +593,6 @@ impl MetaData {
this.rows = rg.rows() as u64; this.rows = rg.rows() as u64;
this.columns = other_meta.columns.clone(); this.columns = other_meta.columns.clone();
this.column_names = other_meta.column_names.clone(); this.column_names = other_meta.column_names.clone();
this.time_of_first_write = now;
this.time_of_last_write = now;
return this; return this;
} }
@ -630,10 +603,9 @@ impl MetaData {
// existing row groups in the table. // existing row groups in the table.
assert_eq!(&this.columns, &other_meta.columns); assert_eq!(&this.columns, &other_meta.columns);
// update size, rows, last write, column ranges, time range // update size, rows, column ranges, time range
this.rgs_size += rg.size(); this.rgs_size += rg.size();
this.rows += rg.rows() as u64; this.rows += rg.rows() as u64;
this.time_of_last_write = now;
// Update the table schema using the incoming row group schema // Update the table schema using the incoming row group schema
for (column_name, column_meta) in &other_meta.columns { for (column_name, column_meta) in &other_meta.columns {
@ -713,7 +685,7 @@ impl MetaData {
self.column_names.iter().map(|name| name.as_str()).collect() self.column_names.iter().map(|name| name.as_str()).collect()
} }
pub fn to_summary(&self, table_name: impl Into<String>) -> TableSummaryAndTimes { pub fn to_summary(&self, table_name: impl Into<String>) -> TableSummary {
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics}; use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics};
let columns = self let columns = self
.columns .columns
@ -777,11 +749,9 @@ impl MetaData {
}) })
.collect(); .collect();
TableSummaryAndTimes { TableSummary {
name: table_name.into(), name: table_name.into(),
columns, columns,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
} }
} }
@ -790,19 +760,15 @@ impl MetaData {
} }
} }
// Builds new table meta-data from a collection of row groups and the collection's metadata. Useful // Builds new table meta-data from a collection of row groups. Useful
// for rebuilding state when a row group has been removed from the table. // for rebuilding state when a row group has been removed from the table.
impl From<(&[Arc<RowGroup>], &Self)> for MetaData { impl From<&[Arc<RowGroup>]> for MetaData {
fn from((row_groups, old_meta): (&[Arc<RowGroup>], &Self)) -> Self { fn from(row_groups: &[Arc<RowGroup>]) -> Self {
if row_groups.is_empty() { if row_groups.is_empty() {
panic!("row groups required for meta data construction"); panic!("row groups required for meta data construction");
} }
let mut meta = Self::new( let mut meta = Self::new(&row_groups[0]);
&row_groups[0],
old_meta.time_of_first_write,
old_meta.time_of_last_write,
);
for row_group in row_groups.iter().skip(1) { for row_group in row_groups.iter().skip(1) {
meta = Self::update_with(meta, &row_group); meta = Self::update_with(meta, &row_group);
} }
@ -1027,21 +993,15 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::{
use crate::column::Column; column::Column,
use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; row_group::{BinaryExpr, ColumnType, ReadAggregateResult},
use crate::schema; schema::{self, LogicalDataType},
use crate::schema::LogicalDataType; value::{AggregateVec, OwnedValue, Scalar},
use crate::value::{AggregateVec, OwnedValue, Scalar}; };
fn table_now(name: impl Into<String>, rg: RowGroup) -> Table {
let now = Utc::now();
Table::with_row_group(name, rg, now, now)
}
#[test] #[test]
fn meta_data_update_with() { fn meta_data_update_with() {
let before_creation = Utc::now();
let columns = vec![ let columns = vec![
( (
"time".to_string(), "time".to_string(),
@ -1053,10 +1013,8 @@ mod test {
), ),
]; ];
let rg = RowGroup::new(3, columns); let rg = RowGroup::new(3, columns);
let now = Utc::now();
let mut meta = MetaData::new(&rg, now, now); let mut meta = MetaData::new(&rg);
let after_creation = Utc::now();
assert_eq!(meta.rows, 3); assert_eq!(meta.rows, 3);
let meta_size = meta.rgs_size; let meta_size = meta.rgs_size;
assert!(meta_size > 0); assert!(meta_size > 0);
@ -1067,10 +1025,6 @@ mod test {
OwnedValue::String("west".to_owned()) OwnedValue::String("west".to_owned())
) )
); );
let first_write = meta.time_of_first_write;
assert_eq!(first_write, meta.time_of_last_write);
assert!(before_creation < first_write);
assert!(first_write < after_creation);
let columns = vec![ let columns = vec![
("time".to_string(), ColumnType::create_time(&[10, 400])), ("time".to_string(), ColumnType::create_time(&[10, 400])),
@ -1082,7 +1036,6 @@ mod test {
let rg = RowGroup::new(2, columns); let rg = RowGroup::new(2, columns);
meta = MetaData::update_with(meta, &rg); meta = MetaData::update_with(meta, &rg);
let after_update = Utc::now();
assert_eq!(meta.rows, 5); assert_eq!(meta.rows, 5);
assert!(meta.rgs_size > meta_size); assert!(meta.rgs_size > meta_size);
assert_eq!( assert_eq!(
@ -1092,34 +1045,23 @@ mod test {
OwnedValue::String("west".to_owned()) OwnedValue::String("west".to_owned())
) )
); );
assert_ne!(meta.time_of_first_write, meta.time_of_last_write);
assert_eq!(meta.time_of_first_write, first_write);
assert!(after_creation < meta.time_of_last_write);
assert!(meta.time_of_last_write < after_update);
} }
#[test] #[test]
fn add_remove_row_groups() { fn add_remove_row_groups() {
let before_creation = Utc::now();
let tc = ColumnType::Time(Column::from(&[0_i64, 2, 3][..])); let tc = ColumnType::Time(Column::from(&[0_i64, 2, 3][..]));
let columns = vec![("time".to_string(), tc)]; let columns = vec![("time".to_string(), tc)];
let rg = RowGroup::new(3, columns); let rg = RowGroup::new(3, columns);
let mut table = table_now("cpu", rg); let mut table = Table::with_row_group("cpu", rg);
let after_creation = Utc::now();
assert_eq!(table.rows(), 3); assert_eq!(table.rows(), 3);
let first_write = table.table_summary().time_of_first_write;
assert_eq!(first_write, table.table_summary().time_of_last_write);
assert!(before_creation < first_write);
assert!(first_write < after_creation);
// add another row group // add another row group
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5][..])); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5][..]));
let columns = vec![("time".to_string(), tc)]; let columns = vec![("time".to_string(), tc)];
let rg = RowGroup::new(5, columns); let rg = RowGroup::new(5, columns);
table.add_row_group(rg); table.add_row_group(rg);
let after_add_row_group = Utc::now();
assert_eq!(table.rows(), 8); assert_eq!(table.rows(), 8);
assert_eq!( assert_eq!(
@ -1129,13 +1071,6 @@ mod test {
OwnedValue::Scalar(Scalar::I64(5)) OwnedValue::Scalar(Scalar::I64(5))
) )
); );
// First write time should be unchanged
assert_eq!(first_write, table.table_summary().time_of_first_write);
// Last write time should be updated
let last_write = table.table_summary().time_of_last_write;
assert_ne!(first_write, last_write);
assert!(after_creation < last_write);
assert!(last_write < after_add_row_group);
// remove the first row group // remove the first row group
table.drop_row_group(0).unwrap(); table.drop_row_group(0).unwrap();
@ -1147,9 +1082,6 @@ mod test {
OwnedValue::Scalar(Scalar::I64(5)) OwnedValue::Scalar(Scalar::I64(5))
) )
); );
// Write times should be unchanged
assert_eq!(first_write, table.table_summary().time_of_first_write);
assert_eq!(last_write, table.table_summary().time_of_last_write);
// attempt to remove the last row group. // attempt to remove the last row group.
table table
@ -1163,7 +1095,7 @@ mod test {
let fc = ColumnType::Field(Column::from(&[1000_u64, 1002, 1200][..])); let fc = ColumnType::Field(Column::from(&[1000_u64, 1002, 1200][..]));
let columns = vec![("time".to_string(), tc), ("count".to_string(), fc)]; let columns = vec![("time".to_string(), tc), ("count".to_string(), fc)];
let row_group = RowGroup::new(3, columns); let row_group = RowGroup::new(3, columns);
let mut table = table_now("cpu", row_group); let mut table = Table::with_row_group("cpu", row_group);
// add another row group // add another row group
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
@ -1197,7 +1129,7 @@ mod test {
("count".to_string(), fc), ("count".to_string(), fc),
]; ];
let row_group = RowGroup::new(3, columns); let row_group = RowGroup::new(3, columns);
let mut table = table_now("cpu", row_group); let mut table = Table::with_row_group("cpu", row_group);
// add another row group // add another row group
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..])); let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
@ -1276,7 +1208,7 @@ mod test {
]; ];
let rg = RowGroup::new(6, columns); let rg = RowGroup::new(6, columns);
let mut table = table_now("cpu", rg); let mut table = Table::with_row_group("cpu", rg);
let exp_col_types = vec![ let exp_col_types = vec![
("region", LogicalDataType::String), ("region", LogicalDataType::String),
@ -1403,7 +1335,7 @@ mod test {
), ),
]; ];
let rg = RowGroup::new(3, columns); let rg = RowGroup::new(3, columns);
let mut table = table_now("cpu", rg); let mut table = Table::with_row_group("cpu", rg);
// Build another row group. // Build another row group.
let columns = vec![ let columns = vec![
@ -1545,7 +1477,7 @@ west,host-b,100
let columns = vec![("time".to_string(), tc), ("region".to_string(), rc)]; let columns = vec![("time".to_string(), tc), ("region".to_string(), rc)];
let rg = RowGroup::new(3, columns); let rg = RowGroup::new(3, columns);
let mut table = table_now("cpu", rg); let mut table = Table::with_row_group("cpu", rg);
// add another row group // add another row group
let tc = ColumnType::Time(Column::from(&[200_i64, 300, 400][..])); let tc = ColumnType::Time(Column::from(&[200_i64, 300, 400][..]));
@ -1617,7 +1549,7 @@ west,host-b,100
]; ];
let rg = RowGroup::new(4, columns); let rg = RowGroup::new(4, columns);
let table = table_now("cpu", rg); let table = Table::with_row_group("cpu", rg);
assert_eq!(table.time_range().unwrap(), (-100, 3)); assert_eq!(table.time_range().unwrap(), (-100, 3));
} }