refactor: inline `Table` into `parquet_file::chunk::Chunk`

Note that the resulting size estimations are different because we were
double-counting `Table`. `mem::size_of::<Self>()` is recursive for
non-boxed types since the child will be part of the parent structure.

Issue: #1295.
pull/24376/head
Marco Neumann 2021-06-11 11:51:51 +02:00
parent 13dd4b23fd
commit f8a518bbed
7 changed files with 119 additions and 237 deletions

View File

@ -1,10 +1,16 @@
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc};
use crate::table::Table;
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
use crate::storage::Storage;
use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::{schema::Schema, selection::Selection};
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use object_store::{path::Path, ObjectStore};
use query::predicate::Predicate;
@ -13,25 +19,15 @@ use std::mem;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error writing table '{}': {}", table_name, source))]
TableWrite {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table Error in '{}': {}", table_name, source))]
NamedTableError {
table_name: String,
source: crate::table::Error,
},
#[snafu(display("Table '{}' not found in chunk", table_name))]
NamedTableNotFoundInChunk { table_name: String },
#[snafu(display("Error read parquet file for table '{}'", table_name,))]
ReadParquet {
table_name: String,
source: crate::table::Error,
#[snafu(display("Failed to read parquet: {}", source))]
ReadParquet { source: crate::storage::Error },
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns {
source: internal_types::schema::Error,
},
}
@ -64,8 +60,23 @@ pub struct Chunk {
/// Partition this chunk belongs to
partition_key: String,
/// The table in chunk
table: Table,
/// Meta data of the table
table_summary: Arc<TableSummary>,
/// Schema that goes with this table's parquet file
schema: Arc<Schema>,
/// Timestamp range of this table's parquet file
/// (extracted from TableSummary)
timestamp_range: Option<TimestampRange>,
/// Object store of the above relative path to open and read the file
object_store: Arc<ObjectStore>,
/// Path in the object store. Format:
/// <writer id>/<database>/data/<partition key>/<chunk
/// id>/<tablename>.parquet
object_store_path: Path,
metrics: ChunkMetrics,
}
@ -79,11 +90,15 @@ impl Chunk {
schema: Schema,
metrics: ChunkMetrics,
) -> Self {
let table = Table::new(table_summary, file_location, store, schema);
let timestamp_range = extract_range(&table_summary);
let mut chunk = Self {
partition_key: part_key.into(),
table,
table_summary: Arc::new(table_summary),
schema: Arc::new(schema),
timestamp_range,
object_store: store,
object_store_path: file_location,
metrics,
};
@ -97,64 +112,109 @@ impl Chunk {
}
/// Return object store path for this chunk
pub fn table_path(&self) -> Path {
self.table.path()
pub fn path(&self) -> Path {
self.object_store_path.clone()
}
/// Returns the summary statistics for this chunk
pub fn table_summary(&self) -> &Arc<TableSummary> {
self.table.table_summary()
&self.table_summary
}
/// Returns the name of the table this chunk holds
pub fn table_name(&self) -> &str {
self.table.name()
&self.table_summary.name
}
/// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows.
pub fn size(&self) -> usize {
self.table.size() + self.partition_key.len() + mem::size_of::<Self>()
mem::size_of::<Self>()
+ self.partition_key.len()
+ self.table_summary.size()
+ mem::size_of_val(&self.schema.as_ref())
+ mem::size_of_val(&self.object_store_path)
}
/// Return possibly restricted Schema for the table in this chunk
pub fn table_schema(&self, selection: Selection<'_>) -> Result<Schema> {
self.table.schema(selection).context(NamedTableError {
table_name: self.table_name(),
/// Return possibly restricted Schema for this chunk
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
Ok(match selection {
Selection::All => self.schema.as_ref().clone(),
Selection::Some(columns) => {
let columns = self.schema.select(columns).context(SelectColumns)?;
self.schema.project(&columns)
}
})
}
/// Infallably return the full schema (for all columns) for this chunk
pub fn full_schema(&self) -> Arc<Schema> {
self.table.full_schema()
Arc::clone(&self.schema)
}
// Return true if the table in this chunk contains values within the time range
// Return true if this chunk contains values within the time range
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
self.table.matches_predicate(timestamp_range)
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
}
}
// Return the columns names that belong to the given column
// selection
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
self.table.column_names(selection)
let fields = self.schema.inner().fields().iter();
Some(match selection {
Selection::Some(cols) => fields
.filter_map(|x| {
if cols.contains(&x.name().as_str()) {
Some(x.name().clone())
} else {
None
}
})
.collect(),
Selection::All => fields.map(|x| x.name().clone()).collect(),
})
}
/// Return stream of data read from parquet file of the given table
/// Return stream of data read from parquet file
pub fn read_filter(
&self,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
self.table
.read_filter(predicate, selection)
.context(ReadParquet {
table_name: self.table_name(),
})
Storage::read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
self.object_store_path.clone(),
Arc::clone(&self.object_store),
)
.context(ReadParquet)
}
/// The total number of rows in all row groups in all tables in this chunk.
/// The total number of rows in all row groups in this chunk.
pub fn rows(&self) -> usize {
self.table.rows()
// All columns have the same rows, so return get row count of the first column
self.table_summary.columns[0].count() as usize
}
}
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
table_summary
.column(TIME_COLUMN_NAME)
.map(|c| {
if let Statistics::I64(s) = &c.stats {
if let (Some(min), Some(max)) = (s.min, s.max) {
return Some(TimestampRange::new(min, max));
}
}
None
})
.flatten()
}

View File

@ -14,7 +14,6 @@ pub mod cleanup;
pub mod metadata;
pub mod rebuild;
pub mod storage;
pub mod table;
pub mod test_utils;
mod storage_testing;

View File

@ -551,7 +551,7 @@ mod tests {
// step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(Selection::All).unwrap();
let schema_expected = chunk.schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics
@ -574,7 +574,7 @@ mod tests {
// step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(Selection::All).unwrap();
let schema_expected = chunk.schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics
@ -595,7 +595,7 @@ mod tests {
// step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(Selection::All).unwrap();
let schema_expected = chunk.schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails
@ -618,7 +618,7 @@ mod tests {
// step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(Selection::All).unwrap();
let schema_expected = chunk.schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails

View File

@ -1,177 +0,0 @@
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc};
use crate::storage::{self, Storage};
use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use object_store::{path::Path, ObjectStore};
use query::predicate::Predicate;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns {
source: internal_types::schema::Error,
},
#[snafu(display("Failed to read parquet: {}", source))]
ReadParquet { source: storage::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Table that belongs to a chunk persisted in a parquet file in object store
#[derive(Debug, Clone)]
pub struct Table {
/// Meta data of the table
table_summary: Arc<TableSummary>,
/// Path in the object store. Format:
/// <writer id>/<database>/data/<partition key>/<chunk
/// id>/<tablename>.parquet
object_store_path: Path,
/// Object store of the above relative path to open and read the file
object_store: Arc<ObjectStore>,
/// Schema that goes with this table's parquet file
table_schema: Arc<Schema>,
/// Timestamp range of this table's parquet file
/// (extracted from TableSummary)
timestamp_range: Option<TimestampRange>,
}
impl Table {
pub fn new(
table_summary: TableSummary,
path: Path,
store: Arc<ObjectStore>,
schema: Schema,
) -> Self {
let timestamp_range = extract_range(&table_summary);
Self {
table_summary: Arc::new(table_summary),
object_store_path: path,
object_store: store,
table_schema: Arc::new(schema),
timestamp_range,
}
}
pub fn table_summary(&self) -> &Arc<TableSummary> {
&self.table_summary
}
pub fn has_table(&self, table_name: &str) -> bool {
self.table_summary.has_table(table_name)
}
/// Return the approximate memory size of the table
pub fn size(&self) -> usize {
mem::size_of::<Self>()
+ self.table_summary.size()
+ mem::size_of_val(&self.object_store_path)
+ mem::size_of_val(&self.table_schema.as_ref())
}
/// Return name of this table
pub fn name(&self) -> &str {
&self.table_summary.name
}
/// Return the object store path of this table
pub fn path(&self) -> Path {
self.object_store_path.clone()
}
/// Return schema of this table for specified selection columns
pub fn schema(&self, selection: Selection<'_>) -> Result<Schema> {
Ok(match selection {
Selection::All => self.table_schema.as_ref().clone(),
Selection::Some(columns) => {
let columns = self.table_schema.select(columns).context(SelectColumns)?;
self.table_schema.project(&columns)
}
})
}
/// Infallably return the full schema (for all columns) for this chunk
pub fn full_schema(&self) -> Arc<Schema> {
Arc::clone(&self.table_schema)
}
// Check if 2 time ranges overlap
pub fn matches_predicate(&self, timestamp_range: Option<&TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
(Some(a), Some(b)) => !a.disjoint(b),
(None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */
// the predicate
(_, None) => true,
}
}
// Return columns names of this table that belong to the given column selection
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
let fields = self.table_schema.inner().fields().iter();
Some(match selection {
Selection::Some(cols) => fields
.filter_map(|x| {
if cols.contains(&x.name().as_str()) {
Some(x.name().clone())
} else {
None
}
})
.collect(),
Selection::All => fields.map(|x| x.name().clone()).collect(),
})
}
/// Return stream of data read from parquet file for given predicate and
/// column selection
pub fn read_filter(
&self,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
Storage::read_filter(
predicate,
selection,
Arc::clone(&self.table_schema.as_arrow()),
self.object_store_path.clone(),
Arc::clone(&self.object_store),
)
.context(ReadParquet)
}
/// The number of rows of this table
pub fn rows(&self) -> usize {
// All columns have the same rows, so return get row count of the first column
self.table_summary.columns[0].count() as usize
}
}
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
table_summary
.column(TIME_COLUMN_NAME)
.map(|c| {
if let Statistics::I64(s) = &c.stats {
if let (Some(min), Some(max)) = (s.min, s.max) {
return Some(TimestampRange::new(min, max));
}
}
None
})
.flatten()
}

View File

@ -66,7 +66,7 @@ pub async fn load_parquet_from_store_for_chunk(
chunk: &Chunk,
store: Arc<ObjectStore>,
) -> Result<(String, Vec<u8>)> {
let path = chunk.table_path();
let path = chunk.path();
let table_name = chunk.table_name().to_string();
Ok((
table_name,
@ -584,7 +584,7 @@ pub async fn make_metadata(
.await
.unwrap();
(
chunk.table_path(),
chunk.path(),
read_parquet_metadata_from_file(parquet_data).unwrap(),
)
}

View File

@ -1461,7 +1461,7 @@ mod tests {
.eq(1.0)
.unwrap();
let expected_parquet_size = 759;
let expected_parquet_size = 647;
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
// now also in OS
catalog_chunk_size_bytes_metric_eq(
@ -1817,7 +1817,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2375.0)
.sample_sum_eq(2263.0)
.unwrap();
// it should be the same chunk!
@ -1925,7 +1925,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2375.0)
.sample_sum_eq(2263.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -1953,7 +1953,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(759.0)
.sample_sum_eq(647.0)
.unwrap();
// Verify data written to the parquet file in object store
@ -2342,7 +2342,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
2373, // size of RB and OS chunks
2261, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2402,7 +2402,7 @@ mod tests {
.memory()
.parquet()
.get_total(),
759
647
);
}
@ -2864,7 +2864,7 @@ mod tests {
let chunk = db.chunk(table_name, partition_key, *chunk_id).unwrap();
let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_expected.push(parquet.table_path().display());
paths_expected.push(parquet.path().display());
} else {
panic!("Wrong chunk state.");
}
@ -2944,7 +2944,7 @@ mod tests {
let chunk = db.chunk(&table_name, &partition_key, chunk_id).unwrap();
let chunk = chunk.read();
if let ChunkStage::Persisted { parquet, .. } = chunk.stage() {
paths_keep.push(parquet.table_path());
paths_keep.push(parquet.path());
} else {
panic!("Wrong chunk state.");
}

View File

@ -195,7 +195,7 @@ impl DbChunk {
/// persisted, if any
pub fn object_store_path(&self) -> Option<Path> {
match &self.state {
State::ParquetFile { chunk } => Some(chunk.table_path()),
State::ParquetFile { chunk } => Some(chunk.path()),
_ => None,
}
}