refactor: Remove multiple table support from Parquet Chunk (#1541)

pull/24376/head
Andrew Lamb 2021-05-24 08:40:31 -04:00 committed by GitHub
parent ee06ca4c7d
commit 27e5b8fabf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 122 additions and 140 deletions

View File

@ -1,4 +1,4 @@
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use crate::table::Table; use crate::table::Table;
@ -64,19 +64,30 @@ pub struct Chunk {
/// Partition this chunk belongs to /// Partition this chunk belongs to
partition_key: String, partition_key: String,
/// Tables of this chunk /// The table in chunk
tables: Vec<Table>, table: Table,
metrics: ChunkMetrics, metrics: ChunkMetrics,
} }
impl Chunk { impl Chunk {
pub fn new(part_key: String, metrics: ChunkMetrics) -> Self { pub fn new(
part_key: impl Into<String>,
table_summary: TableSummary,
file_location: Path,
store: Arc<ObjectStore>,
schema: Schema,
range: Option<TimestampRange>,
metrics: ChunkMetrics,
) -> Self {
let table = Table::new(table_summary, file_location, store, schema, range);
let mut chunk = Self { let mut chunk = Self {
partition_key: part_key, partition_key: part_key.into(),
tables: Default::default(), table,
metrics, metrics,
}; };
chunk.metrics.memory_bytes.set(chunk.size()); chunk.metrics.memory_bytes.set(chunk.size());
chunk chunk
} }
@ -86,77 +97,42 @@ impl Chunk {
self.partition_key.as_ref() self.partition_key.as_ref()
} }
/// Return all paths of this chunks /// Return object store path for this chunk
pub fn all_paths(&self) -> Vec<Path> { pub fn table_path(&self) -> Path {
self.tables.iter().map(|t| t.path()).collect() self.table.path()
} }
/// Returns a vec of the summary statistics of the tables in this chunk /// Returns the summary statistics for this chunk
pub fn table_summaries(&self) -> Vec<TableSummary> { pub fn table_summary(&self) -> TableSummary {
self.tables.iter().map(|t| t.table_summary()).collect() self.table.table_summary()
} }
/// Add a chunk's table and its summary /// Returns the name of the table this chunk holds
pub fn add_table( pub fn table_name(&self) -> &str {
&mut self, self.table.name()
table_summary: TableSummary,
file_location: Path,
store: Arc<ObjectStore>,
schema: Schema,
range: Option<TimestampRange>,
) {
self.tables.push(Table::new(
table_summary,
file_location,
store,
schema,
range,
));
} }
/// Return true if this chunk includes the given table /// Return true if this chunk includes the given table
pub fn has_table(&self, table_name: &str) -> bool { pub fn has_table(&self, table_name: &str) -> bool {
self.tables.iter().any(|t| t.has_table(table_name)) self.table_name() == table_name
}
// Return all tables of this chunk
pub fn all_table_names(&self, names: &mut BTreeSet<String>) {
let mut tables = self
.tables
.iter()
.map(|t| t.name())
.collect::<BTreeSet<String>>();
names.append(&mut tables);
} }
/// Return the approximate memory size of the chunk, in bytes including the /// Return the approximate memory size of the chunk, in bytes including the
/// dictionary, tables, and their rows. /// dictionary, tables, and their rows.
pub fn size(&self) -> usize { pub fn size(&self) -> usize {
let size: usize = self.tables.iter().map(|t| t.size()).sum(); self.table.size() + self.partition_key.len() + mem::size_of::<Self>()
size + self.partition_key.len() + mem::size_of::<u32>() + mem::size_of::<Self>()
} }
/// Return Schema for the specified table / columns /// Return Schema for the table in this chunk
pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result<Schema> { pub fn table_schema(&self, selection: Selection<'_>) -> Result<Schema> {
let table = self.find_table(table_name)?; self.table.schema(selection).context(NamedTableError {
table_name: self.table_name(),
table })
.schema(selection)
.context(NamedTableError { table_name })
} }
/// Return object store path for specified table /// Return the timestamp range of the table
pub fn table_path(&self, table_name: &str) -> Result<Path> { pub fn timestamp_range(&self) -> Option<TimestampRange> {
let table = self.find_table(table_name)?; self.table.timestamp_range()
Ok(table.path())
}
/// Return Schema for the specified table / columns
pub fn timestamp_range(&self, table_name: &str) -> Result<Option<TimestampRange>> {
let table = self.find_table(table_name)?;
Ok(table.timestamp_range())
} }
// Return all tables of this chunk whose timestamp overlaps with the give one // Return all tables of this chunk whose timestamp overlaps with the give one
@ -164,28 +140,15 @@ impl Chunk {
&self, &self,
timestamp_range: Option<TimestampRange>, timestamp_range: Option<TimestampRange>,
) -> impl Iterator<Item = String> + '_ { ) -> impl Iterator<Item = String> + '_ {
self.tables.iter().flat_map(move |t| { std::iter::once(&self.table)
if t.matches_predicate(&timestamp_range) { .filter(move |table| table.matches_predicate(&timestamp_range))
Some(t.name()) .map(|table| table.name().to_string())
} else {
None
}
})
} }
// Return columns names of a given table that belong to the given column // Return the columns names that belong to the given column
// selection // selection
pub fn column_names( pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
&self, self.table.column_names(selection)
table_name: &str,
selection: Selection<'_>,
) -> Option<BTreeSet<String>> {
let table = self.find_table(table_name);
match table {
Ok(table) => table.column_names(selection),
Err(_) => None,
}
} }
/// Return stream of data read from parquet file of the given table /// Return stream of data read from parquet file of the given table
@ -195,22 +158,13 @@ impl Chunk {
predicate: &Predicate, predicate: &Predicate,
selection: Selection<'_>, selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> { ) -> Result<SendableRecordBatchStream> {
let table = self.find_table(table_name)?; self.table
table
.read_filter(predicate, selection) .read_filter(predicate, selection)
.context(ReadParquet { table_name }) .context(ReadParquet { table_name })
} }
/// The total number of rows in all row groups in all tables in this chunk. /// The total number of rows in all row groups in all tables in this chunk.
pub fn rows(&self) -> usize { pub fn rows(&self) -> usize {
self.tables.iter().map(|t| t.rows()).sum() self.table.rows()
}
fn find_table(&self, table_name: &str) -> Result<&Table> {
self.tables
.iter()
.find(|t| t.has_table(table_name))
.context(NamedTableNotFoundInChunk { table_name })
} }
} }

View File

@ -492,15 +492,15 @@ mod tests {
// step 1: read back schema // step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); let schema_expected = chunk.table_schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected); assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics // step 2: read back statistics
let (table_summary_actual, timestamp_range_actual) = let (table_summary_actual, timestamp_range_actual) =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap(); .unwrap();
let table_summary_expected = chunk.table_summaries().first().cloned().unwrap(); let table_summary_expected = chunk.table_summary();
let timestamp_range_expected = chunk.timestamp_range(&table).unwrap(); let timestamp_range_expected = chunk.timestamp_range();
assert_eq!(table_summary_actual, table_summary_expected); assert_eq!(table_summary_actual, table_summary_expected);
assert_eq!(timestamp_range_actual, timestamp_range_expected) assert_eq!(timestamp_range_actual, timestamp_range_expected)
} }
@ -517,15 +517,15 @@ mod tests {
// step 1: read back schema // step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); let schema_expected = chunk.table_schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected); assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics // step 2: read back statistics
let (table_summary_actual, timestamp_range_actual) = let (table_summary_actual, timestamp_range_actual) =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap(); .unwrap();
let table_summary_expected = chunk.table_summaries().first().cloned().unwrap(); let table_summary_expected = chunk.table_summary();
let timestamp_range_expected = chunk.timestamp_range(&table).unwrap(); let timestamp_range_expected = chunk.timestamp_range();
assert_eq!(table_summary_actual, table_summary_expected); assert_eq!(table_summary_actual, table_summary_expected);
assert_eq!(timestamp_range_actual, timestamp_range_expected) assert_eq!(timestamp_range_actual, timestamp_range_expected)
} }
@ -540,7 +540,7 @@ mod tests {
// step 1: read back schema // step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); let schema_expected = chunk.table_schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected); assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails // step 2: reading back statistics fails
@ -563,7 +563,7 @@ mod tests {
// step 1: read back schema // step 1: read back schema
let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap();
let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); let schema_expected = chunk.table_schema(Selection::All).unwrap();
assert_eq!(schema_actual, schema_expected); assert_eq!(schema_actual, schema_expected);
// step 2: reading back statistics fails // step 2: reading back statistics fails
@ -592,7 +592,7 @@ mod tests {
// column count in summary including the timestamp column // column count in summary including the timestamp column
assert_eq!( assert_eq!(
chunk.table_summaries().first().unwrap().columns.len(), chunk.table_summary().columns.len(),
parquet_metadata parquet_metadata
.file_metadata() .file_metadata()
.schema_descr() .schema_descr()
@ -602,7 +602,7 @@ mod tests {
// check that column counts are consistent // check that column counts are consistent
let n_rows = parquet_metadata.file_metadata().num_rows() as u64; let n_rows = parquet_metadata.file_metadata().num_rows() as u64;
assert!(n_rows >= parquet_metadata.num_row_groups() as u64); assert!(n_rows >= parquet_metadata.num_row_groups() as u64);
for summary in &chunk.table_summaries().first().unwrap().columns { for summary in &chunk.table_summary().columns {
assert_eq!(summary.count(), n_rows); assert_eq!(summary.count(), n_rows);
} }
@ -631,7 +631,7 @@ mod tests {
// column count in summary including the timestamp column // column count in summary including the timestamp column
assert_eq!( assert_eq!(
chunk.table_summaries().first().unwrap().columns.len(), chunk.table_summary().columns.len(),
parquet_metadata parquet_metadata
.file_metadata() .file_metadata()
.schema_descr() .schema_descr()

View File

@ -76,8 +76,8 @@ impl Table {
} }
/// Return name of this table /// Return name of this table
pub fn name(&self) -> String { pub fn name(&self) -> &str {
self.table_summary.name.clone() &self.table_summary.name
} }
/// Return the object store path of this table /// Return the object store path of this table

View File

@ -32,7 +32,7 @@ use crate::{
chunk::{self, Chunk}, chunk::{self, Chunk},
storage::Storage, storage::Storage,
}; };
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
@ -60,12 +60,12 @@ pub async fn load_parquet_from_store_for_chunk(
chunk: &Chunk, chunk: &Chunk,
store: Arc<ObjectStore>, store: Arc<ObjectStore>,
) -> Result<(String, Vec<u8>)> { ) -> Result<(String, Vec<u8>)> {
let table = chunk let path = chunk.table_path();
.table_names(None) let table_name = chunk.table_name().to_string();
.next() Ok((
.context(LoadingFromObjectStore)?; table_name,
let path = chunk.table_path(&table).context(ReadingChunk)?; load_parquet_from_store_for_path(&path, store).await?,
Ok((table, load_parquet_from_store_for_path(&path, store).await?)) ))
} }
pub async fn load_parquet_from_store_for_path( pub async fn load_parquet_from_store_for_path(
@ -151,7 +151,6 @@ async fn make_chunk_common(
let part_key = "part1"; let part_key = "part1";
let table_name = table; let table_name = table;
let chunk_id = 1; let chunk_id = 1;
let mut chunk = Chunk::new(part_key.to_string(), ChunkMetrics::new_unregistered());
let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string()); let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string());
@ -174,15 +173,16 @@ async fn make_chunk_common(
) )
.await .await
.unwrap(); .unwrap();
chunk.add_table(
Chunk::new(
part_key,
table_summary, table_summary,
path, path,
Arc::clone(&store), Arc::clone(&store),
schema, schema,
Some(time_range), Some(time_range),
); ChunkMetrics::new_unregistered(),
)
chunk
} }
fn create_column_tag( fn create_column_tag(

View File

@ -1147,16 +1147,16 @@ impl CatalogState for Catalog {
let metrics = self let metrics = self
.metrics_registry .metrics_registry
.register_domain_with_labels("parquet", self.metric_labels.clone()); .register_domain_with_labels("parquet", self.metric_labels.clone());
let mut parquet_chunk = ParquetChunk::new(
partition_key.to_string(), let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet());
ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet()), let parquet_chunk = ParquetChunk::new(
); &partition_key,
parquet_chunk.add_table(
table_summary, table_summary,
object_store.path_from_dirs_and_filename(info.path.clone()), object_store.path_from_dirs_and_filename(info.path.clone()),
object_store, object_store,
schema, schema,
timestamp_range, timestamp_range,
metrics,
); );
let parquet_chunk = Arc::new(parquet_chunk); let parquet_chunk = Arc::new(parquet_chunk);
@ -1420,9 +1420,15 @@ mod tests {
.eq(1.0) .eq(1.0)
.unwrap(); .unwrap();
let expected_parquet_size = 807;
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
// now also in OS // now also in OS
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "parquet", 81).unwrap(); // TODO: #1311 catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"parquet",
expected_parquet_size,
)
.unwrap(); // TODO: #1311
db.unload_read_buffer("1970-01-01T00", "cpu", 0) db.unload_read_buffer("1970-01-01T00", "cpu", 0)
.await .await
@ -1438,7 +1444,12 @@ mod tests {
.unwrap(); .unwrap();
// verify chunk size not increased for OS (it was in OS before unload) // verify chunk size not increased for OS (it was in OS before unload)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "parquet", 81).unwrap(); catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"parquet",
expected_parquet_size,
)
.unwrap();
// verify chunk size for RB has decreased // verify chunk size for RB has decreased
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
} }
@ -1765,7 +1776,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(2281.0) .sample_sum_eq(2405.0)
.unwrap(); .unwrap();
// it should be the same chunk! // it should be the same chunk!
@ -1874,7 +1885,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(2281.0) .sample_sum_eq(2405.0)
.unwrap(); .unwrap();
// Unload RB chunk but keep it in OS // Unload RB chunk but keep it in OS
@ -1902,7 +1913,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(683.0) .sample_sum_eq(807.0)
.unwrap(); .unwrap();
// Verify data written to the parquet file in object store // Verify data written to the parquet file in object store
@ -2263,7 +2274,7 @@ mod tests {
Arc::from("cpu"), Arc::from("cpu"),
0, 0,
ChunkStorage::ReadBufferAndObjectStore, ChunkStorage::ReadBufferAndObjectStore,
2272, // size of RB and OS chunks 2396, // size of RB and OS chunks
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2318,8 +2329,8 @@ mod tests {
); );
assert_eq!( assert_eq!(
db.catalog.state().metrics().memory().parquet().get_total(), db.catalog.state().metrics().memory().parquet().get_total(),
81 807
); // TODO: This 89 must be replaced with 691. Ticket #1311 );
} }
#[tokio::test] #[tokio::test]
@ -2867,7 +2878,7 @@ mod tests {
}; };
let chunk = chunk.read(); let chunk = chunk.read();
if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() {
paths_expected.push(chunk.table_path("cpu").unwrap().display()); paths_expected.push(chunk.table_path().display());
} else { } else {
panic!("Wrong chunk state."); panic!("Wrong chunk state.");
} }

View File

@ -353,11 +353,7 @@ impl Chunk {
assert_eq!(summaries.len(), 1); assert_eq!(summaries.len(), 1);
summaries.remove(0) summaries.remove(0)
} }
ChunkState::ObjectStoreOnly(chunk) => { ChunkState::ObjectStoreOnly(chunk) => chunk.table_summary(),
let mut summaries = chunk.table_summaries();
assert_eq!(summaries.len(), 1);
summaries.remove(0)
}
} }
} }

View File

@ -149,7 +149,7 @@ impl DbChunk {
/// Return object store paths /// Return object store paths
pub fn object_store_paths(&self) -> Vec<Path> { pub fn object_store_paths(&self) -> Vec<Path> {
match &self.state { match &self.state {
State::ParquetFile { chunk } => chunk.all_paths(), State::ParquetFile { chunk } => vec![chunk.table_path()],
_ => vec![], _ => vec![],
} }
} }
@ -174,7 +174,12 @@ impl PartitionChunk for DbChunk {
known_tables.insert(name); known_tables.insert(name);
} }
} }
State::ParquetFile { chunk, .. } => chunk.all_table_names(known_tables), State::ParquetFile { chunk, .. } => {
let table_name = chunk.table_name();
if !known_tables.contains(table_name) {
known_tables.insert(table_name.to_string());
}
}
} }
} }
@ -251,7 +256,7 @@ impl PartitionChunk for DbChunk {
} }
State::ParquetFile { chunk, .. } => { State::ParquetFile { chunk, .. } => {
chunk chunk
.table_schema(table_name, selection) .table_schema(selection)
.context(ParquetFileChunkError { .context(ParquetFileChunkError {
chunk_id: self.id(), chunk_id: self.id(),
}) })
@ -353,7 +358,7 @@ impl PartitionChunk for DbChunk {
// TODO: Support predicates when MB supports it // TODO: Support predicates when MB supports it
return Ok(None); return Ok(None);
} }
Ok(chunk.column_names(table_name, columns)) Ok(chunk.column_names(columns))
} }
} }
} }

View File

@ -370,8 +370,9 @@ fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime<Utc>) -> bool {
mod tests { mod tests {
use super::*; use super::*;
use crate::db::catalog::chunk::ChunkMetrics; use crate::db::catalog::chunk::ChunkMetrics;
use data_types::server_id::ServerId; use data_types::{partition_metadata::TableSummary, server_id::ServerId};
use entry::{test_helpers::lp_to_entry, ClockValue}; use entry::{test_helpers::lp_to_entry, ClockValue};
use object_store::{memory::InMemory, parsed_path, ObjectStore};
use std::{ use std::{
convert::TryFrom, convert::TryFrom,
num::{NonZeroU32, NonZeroUsize}, num::{NonZeroU32, NonZeroUsize},
@ -451,8 +452,23 @@ mod tests {
} }
fn new_parquet_chunk(chunk: &Chunk) -> parquet_file::chunk::Chunk { fn new_parquet_chunk(chunk: &Chunk) -> parquet_file::chunk::Chunk {
let in_memory = InMemory::new();
let schema = internal_types::schema::builder::SchemaBuilder::new()
.tag("foo")
.build()
.unwrap();
let object_store = Arc::new(ObjectStore::new_in_memory(in_memory));
let path = object_store.path_from_dirs_and_filename(parsed_path!("foo"));
parquet_file::chunk::Chunk::new( parquet_file::chunk::Chunk::new(
chunk.key().to_string(), chunk.key(),
TableSummary::new("my_awesome_table"),
path,
object_store,
schema,
None,
parquet_file::chunk::ChunkMetrics::new_unregistered(), parquet_file::chunk::ChunkMetrics::new_unregistered(),
) )
} }