diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index b4856579b6..e753ce84b3 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,4 +1,4 @@ -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, sync::Arc}; use crate::table::Table; @@ -64,19 +64,30 @@ pub struct Chunk { /// Partition this chunk belongs to partition_key: String, - /// Tables of this chunk - tables: Vec, + /// The table in chunk + table: Table, metrics: ChunkMetrics, } impl Chunk { - pub fn new(part_key: String, metrics: ChunkMetrics) -> Self { + pub fn new( + part_key: impl Into, + table_summary: TableSummary, + file_location: Path, + store: Arc, + schema: Schema, + range: Option, + metrics: ChunkMetrics, + ) -> Self { + let table = Table::new(table_summary, file_location, store, schema, range); + let mut chunk = Self { - partition_key: part_key, - tables: Default::default(), + partition_key: part_key.into(), + table, metrics, }; + chunk.metrics.memory_bytes.set(chunk.size()); chunk } @@ -86,77 +97,42 @@ impl Chunk { self.partition_key.as_ref() } - /// Return all paths of this chunks - pub fn all_paths(&self) -> Vec { - self.tables.iter().map(|t| t.path()).collect() + /// Return object store path for this chunk + pub fn table_path(&self) -> Path { + self.table.path() } - /// Returns a vec of the summary statistics of the tables in this chunk - pub fn table_summaries(&self) -> Vec { - self.tables.iter().map(|t| t.table_summary()).collect() + /// Returns the summary statistics for this chunk + pub fn table_summary(&self) -> TableSummary { + self.table.table_summary() } - /// Add a chunk's table and its summary - pub fn add_table( - &mut self, - table_summary: TableSummary, - file_location: Path, - store: Arc, - schema: Schema, - range: Option, - ) { - self.tables.push(Table::new( - table_summary, - file_location, - store, - schema, - range, - )); + /// Returns the name of the table this chunk holds + pub fn table_name(&self) -> &str { + self.table.name() } /// Return true if this chunk includes the given table pub fn has_table(&self, table_name: &str) -> bool { - self.tables.iter().any(|t| t.has_table(table_name)) - } - - // Return all tables of this chunk - pub fn all_table_names(&self, names: &mut BTreeSet) { - let mut tables = self - .tables - .iter() - .map(|t| t.name()) - .collect::>(); - - names.append(&mut tables); + self.table_name() == table_name } /// Return the approximate memory size of the chunk, in bytes including the /// dictionary, tables, and their rows. pub fn size(&self) -> usize { - let size: usize = self.tables.iter().map(|t| t.size()).sum(); - - size + self.partition_key.len() + mem::size_of::() + mem::size_of::() + self.table.size() + self.partition_key.len() + mem::size_of::() } - /// Return Schema for the specified table / columns - pub fn table_schema(&self, table_name: &str, selection: Selection<'_>) -> Result { - let table = self.find_table(table_name)?; - - table - .schema(selection) - .context(NamedTableError { table_name }) + /// Return Schema for the table in this chunk + pub fn table_schema(&self, selection: Selection<'_>) -> Result { + self.table.schema(selection).context(NamedTableError { + table_name: self.table_name(), + }) } - /// Return object store path for specified table - pub fn table_path(&self, table_name: &str) -> Result { - let table = self.find_table(table_name)?; - Ok(table.path()) - } - - /// Return Schema for the specified table / columns - pub fn timestamp_range(&self, table_name: &str) -> Result> { - let table = self.find_table(table_name)?; - Ok(table.timestamp_range()) + /// Return the timestamp range of the table + pub fn timestamp_range(&self) -> Option { + self.table.timestamp_range() } // Return all tables of this chunk whose timestamp overlaps with the give one @@ -164,28 +140,15 @@ impl Chunk { &self, timestamp_range: Option, ) -> impl Iterator + '_ { - self.tables.iter().flat_map(move |t| { - if t.matches_predicate(×tamp_range) { - Some(t.name()) - } else { - None - } - }) + std::iter::once(&self.table) + .filter(move |table| table.matches_predicate(×tamp_range)) + .map(|table| table.name().to_string()) } - // Return columns names of a given table that belong to the given column + // Return the columns names that belong to the given column // selection - pub fn column_names( - &self, - table_name: &str, - selection: Selection<'_>, - ) -> Option> { - let table = self.find_table(table_name); - - match table { - Ok(table) => table.column_names(selection), - Err(_) => None, - } + pub fn column_names(&self, selection: Selection<'_>) -> Option> { + self.table.column_names(selection) } /// Return stream of data read from parquet file of the given table @@ -195,22 +158,13 @@ impl Chunk { predicate: &Predicate, selection: Selection<'_>, ) -> Result { - let table = self.find_table(table_name)?; - - table + self.table .read_filter(predicate, selection) .context(ReadParquet { table_name }) } /// The total number of rows in all row groups in all tables in this chunk. pub fn rows(&self) -> usize { - self.tables.iter().map(|t| t.rows()).sum() - } - - fn find_table(&self, table_name: &str) -> Result<&Table> { - self.tables - .iter() - .find(|t| t.has_table(table_name)) - .context(NamedTableNotFoundInChunk { table_name }) + self.table.rows() } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 4553fdf8fd..67feec455f 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -492,15 +492,15 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); + let schema_expected = chunk.table_schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: read back statistics let (table_summary_actual, timestamp_range_actual) = read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) .unwrap(); - let table_summary_expected = chunk.table_summaries().first().cloned().unwrap(); - let timestamp_range_expected = chunk.timestamp_range(&table).unwrap(); + let table_summary_expected = chunk.table_summary(); + let timestamp_range_expected = chunk.timestamp_range(); assert_eq!(table_summary_actual, table_summary_expected); assert_eq!(timestamp_range_actual, timestamp_range_expected) } @@ -517,15 +517,15 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); + let schema_expected = chunk.table_schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: read back statistics let (table_summary_actual, timestamp_range_actual) = read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) .unwrap(); - let table_summary_expected = chunk.table_summaries().first().cloned().unwrap(); - let timestamp_range_expected = chunk.timestamp_range(&table).unwrap(); + let table_summary_expected = chunk.table_summary(); + let timestamp_range_expected = chunk.timestamp_range(); assert_eq!(table_summary_actual, table_summary_expected); assert_eq!(timestamp_range_actual, timestamp_range_expected) } @@ -540,7 +540,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); + let schema_expected = chunk.table_schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: reading back statistics fails @@ -563,7 +563,7 @@ mod tests { // step 1: read back schema let schema_actual = read_schema_from_parquet_metadata(&parquet_metadata).unwrap(); - let schema_expected = chunk.table_schema(&table, Selection::All).unwrap(); + let schema_expected = chunk.table_schema(Selection::All).unwrap(); assert_eq!(schema_actual, schema_expected); // step 2: reading back statistics fails @@ -592,7 +592,7 @@ mod tests { // column count in summary including the timestamp column assert_eq!( - chunk.table_summaries().first().unwrap().columns.len(), + chunk.table_summary().columns.len(), parquet_metadata .file_metadata() .schema_descr() @@ -602,7 +602,7 @@ mod tests { // check that column counts are consistent let n_rows = parquet_metadata.file_metadata().num_rows() as u64; assert!(n_rows >= parquet_metadata.num_row_groups() as u64); - for summary in &chunk.table_summaries().first().unwrap().columns { + for summary in &chunk.table_summary().columns { assert_eq!(summary.count(), n_rows); } @@ -631,7 +631,7 @@ mod tests { // column count in summary including the timestamp column assert_eq!( - chunk.table_summaries().first().unwrap().columns.len(), + chunk.table_summary().columns.len(), parquet_metadata .file_metadata() .schema_descr() diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs index 513047107d..26c4fec778 100644 --- a/parquet_file/src/table.rs +++ b/parquet_file/src/table.rs @@ -76,8 +76,8 @@ impl Table { } /// Return name of this table - pub fn name(&self) -> String { - self.table_summary.name.clone() + pub fn name(&self) -> &str { + &self.table_summary.name } /// Return the object store path of this table diff --git a/parquet_file/src/utils.rs b/parquet_file/src/utils.rs index c5240e31c5..c3e10f6500 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/utils.rs @@ -32,7 +32,7 @@ use crate::{ chunk::{self, Chunk}, storage::Storage, }; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { @@ -60,12 +60,12 @@ pub async fn load_parquet_from_store_for_chunk( chunk: &Chunk, store: Arc, ) -> Result<(String, Vec)> { - let table = chunk - .table_names(None) - .next() - .context(LoadingFromObjectStore)?; - let path = chunk.table_path(&table).context(ReadingChunk)?; - Ok((table, load_parquet_from_store_for_path(&path, store).await?)) + let path = chunk.table_path(); + let table_name = chunk.table_name().to_string(); + Ok(( + table_name, + load_parquet_from_store_for_path(&path, store).await?, + )) } pub async fn load_parquet_from_store_for_path( @@ -151,7 +151,6 @@ async fn make_chunk_common( let part_key = "part1"; let table_name = table; let chunk_id = 1; - let mut chunk = Chunk::new(part_key.to_string(), ChunkMetrics::new_unregistered()); let storage = Storage::new(Arc::clone(&store), server_id, db_name.to_string()); @@ -174,15 +173,16 @@ async fn make_chunk_common( ) .await .unwrap(); - chunk.add_table( + + Chunk::new( + part_key, table_summary, path, Arc::clone(&store), schema, Some(time_range), - ); - - chunk + ChunkMetrics::new_unregistered(), + ) } fn create_column_tag( diff --git a/server/src/db.rs b/server/src/db.rs index 6c7c520379..a8938cb9a1 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1147,16 +1147,16 @@ impl CatalogState for Catalog { let metrics = self .metrics_registry .register_domain_with_labels("parquet", self.metric_labels.clone()); - let mut parquet_chunk = ParquetChunk::new( - partition_key.to_string(), - ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet()), - ); - parquet_chunk.add_table( + + let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet()); + let parquet_chunk = ParquetChunk::new( + &partition_key, table_summary, object_store.path_from_dirs_and_filename(info.path.clone()), object_store, schema, timestamp_range, + metrics, ); let parquet_chunk = Arc::new(parquet_chunk); @@ -1420,9 +1420,15 @@ mod tests { .eq(1.0) .unwrap(); + let expected_parquet_size = 807; catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap(); // now also in OS - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "parquet", 81).unwrap(); // TODO: #1311 + catalog_chunk_size_bytes_metric_eq( + &test_db.metric_registry, + "parquet", + expected_parquet_size, + ) + .unwrap(); // TODO: #1311 db.unload_read_buffer("1970-01-01T00", "cpu", 0) .await @@ -1438,7 +1444,12 @@ mod tests { .unwrap(); // verify chunk size not increased for OS (it was in OS before unload) - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "parquet", 81).unwrap(); + catalog_chunk_size_bytes_metric_eq( + &test_db.metric_registry, + "parquet", + expected_parquet_size, + ) + .unwrap(); // verify chunk size for RB has decreased catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap(); } @@ -1765,7 +1776,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2281.0) + .sample_sum_eq(2405.0) .unwrap(); // it should be the same chunk! @@ -1874,7 +1885,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2281.0) + .sample_sum_eq(2405.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -1902,7 +1913,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(683.0) + .sample_sum_eq(807.0) .unwrap(); // Verify data written to the parquet file in object store @@ -2263,7 +2274,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ReadBufferAndObjectStore, - 2272, // size of RB and OS chunks + 2396, // size of RB and OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2318,8 +2329,8 @@ mod tests { ); assert_eq!( db.catalog.state().metrics().memory().parquet().get_total(), - 81 - ); // TODO: This 89 must be replaced with 691. Ticket #1311 + 807 + ); } #[tokio::test] @@ -2867,7 +2878,7 @@ mod tests { }; let chunk = chunk.read(); if let ChunkState::WrittenToObjectStore(_, chunk) = chunk.state() { - paths_expected.push(chunk.table_path("cpu").unwrap().display()); + paths_expected.push(chunk.table_path().display()); } else { panic!("Wrong chunk state."); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index e5a63194f3..3abb0c9b0f 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -353,11 +353,7 @@ impl Chunk { assert_eq!(summaries.len(), 1); summaries.remove(0) } - ChunkState::ObjectStoreOnly(chunk) => { - let mut summaries = chunk.table_summaries(); - assert_eq!(summaries.len(), 1); - summaries.remove(0) - } + ChunkState::ObjectStoreOnly(chunk) => chunk.table_summary(), } } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 750e66b4f3..bf371abc4f 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -149,7 +149,7 @@ impl DbChunk { /// Return object store paths pub fn object_store_paths(&self) -> Vec { match &self.state { - State::ParquetFile { chunk } => chunk.all_paths(), + State::ParquetFile { chunk } => vec![chunk.table_path()], _ => vec![], } } @@ -174,7 +174,12 @@ impl PartitionChunk for DbChunk { known_tables.insert(name); } } - State::ParquetFile { chunk, .. } => chunk.all_table_names(known_tables), + State::ParquetFile { chunk, .. } => { + let table_name = chunk.table_name(); + if !known_tables.contains(table_name) { + known_tables.insert(table_name.to_string()); + } + } } } @@ -251,7 +256,7 @@ impl PartitionChunk for DbChunk { } State::ParquetFile { chunk, .. } => { chunk - .table_schema(table_name, selection) + .table_schema(selection) .context(ParquetFileChunkError { chunk_id: self.id(), }) @@ -353,7 +358,7 @@ impl PartitionChunk for DbChunk { // TODO: Support predicates when MB supports it return Ok(None); } - Ok(chunk.column_names(table_name, columns)) + Ok(chunk.column_names(columns)) } } } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index e1404a92cb..a36ff66236 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -370,8 +370,9 @@ fn can_move(rules: &LifecycleRules, chunk: &Chunk, now: DateTime) -> bool { mod tests { use super::*; use crate::db::catalog::chunk::ChunkMetrics; - use data_types::server_id::ServerId; + use data_types::{partition_metadata::TableSummary, server_id::ServerId}; use entry::{test_helpers::lp_to_entry, ClockValue}; + use object_store::{memory::InMemory, parsed_path, ObjectStore}; use std::{ convert::TryFrom, num::{NonZeroU32, NonZeroUsize}, @@ -451,8 +452,23 @@ mod tests { } fn new_parquet_chunk(chunk: &Chunk) -> parquet_file::chunk::Chunk { + let in_memory = InMemory::new(); + + let schema = internal_types::schema::builder::SchemaBuilder::new() + .tag("foo") + .build() + .unwrap(); + + let object_store = Arc::new(ObjectStore::new_in_memory(in_memory)); + let path = object_store.path_from_dirs_and_filename(parsed_path!("foo")); + parquet_file::chunk::Chunk::new( - chunk.key().to_string(), + chunk.key(), + TableSummary::new("my_awesome_table"), + path, + object_store, + schema, + None, parquet_file::chunk::ChunkMetrics::new_unregistered(), ) }