diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index e59a15aca8..13a1baf9a4 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -10,10 +10,9 @@ use cache_system::{ cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, loader::{metrics::MetricsLoader, FunctionLoader}, }; -use data_types::{ParquetFileWithMetadata, SequenceNumber, TableId}; +use data_types::{ParquetFile, SequenceNumber, TableId}; use iox_catalog::interface::Catalog; use iox_time::TimeProvider; -use parquet_file::chunk::DecodedParquetFile; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, mem, sync::Arc}; @@ -30,20 +29,16 @@ pub enum Error { }, } +/// Holds catalog information about a parquet file #[derive(Debug)] -/// Holds decoded catalog information about a parquet file pub struct CachedParquetFiles { - /// Parquet catalog information and decoded metadata - pub files: Arc>>, + /// Parquet catalog information + pub files: Arc>>, } impl CachedParquetFiles { - fn new(parquet_files_with_metadata: Vec) -> Self { - let files: Vec<_> = parquet_files_with_metadata - .into_iter() - .map(DecodedParquetFile::new) - .map(Arc::new) - .collect(); + fn new(parquet_files: Vec) -> Self { + let files: Vec<_> = parquet_files.into_iter().map(Arc::new).collect(); Self { files: Arc::new(files), @@ -51,7 +46,7 @@ impl CachedParquetFiles { } /// return the underying files as a new Vec - pub fn vec(&self) -> Vec> { + pub fn vec(&self) -> Vec> { self.files.as_ref().clone() } @@ -72,22 +67,19 @@ impl CachedParquetFiles { mem::size_of_val(self) + // Vec overhead mem::size_of_val(self.files.as_ref()) + - // size of the underlying decoded parquet files + // size of the underlying parquet files self.files.iter().map(|f| f.size()).sum::() } /// Returns the greatest parquet sequence number stored in this cache entry pub(crate) fn max_parquet_sequence_number(&self) -> Option { - self.files - .iter() - .map(|f| f.parquet_file.max_sequence_number) - .max() + self.files.iter().map(|f| f.max_sequence_number).max() } } type CacheT = Box, Extra = ()>>; -/// Cache for parquet file information with metadata. +/// Cache for parquet file information. /// /// DOES NOT CACHE the actual parquet bytes from object store #[derive(Debug)] @@ -126,17 +118,16 @@ impl ParquetFileCache { // 2. track time ranges needed for queries and // limit files fetched to what is actually // needed - let parquet_files_with_metadata: Vec<_> = catalog + let parquet_files: Vec<_> = catalog .repositories() .await .parquet_files() - .list_by_table_not_to_delete_with_metadata(table_id) + .list_by_table_not_to_delete(table_id) .await .context(CatalogSnafu)?; - Ok(Arc::new(CachedParquetFiles::new( - parquet_files_with_metadata, - ))) as std::result::Result<_, Error> + Ok(Arc::new(CachedParquetFiles::new(parquet_files))) + as std::result::Result<_, Error> }) .await .expect("retry forever") @@ -234,11 +225,10 @@ mod tests { use super::*; use data_types::{ParquetFile, ParquetFileId}; use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFile, TestPartition, TestTable}; - use test_helpers::assert_close; use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count}; - const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete_with_metadata"; + const METRIC_NAME: &str = "parquet_list_by_table_not_to_delete"; #[tokio::test] async fn test_parquet_chunks() { @@ -250,7 +240,7 @@ mod tests { assert_eq!(cached_files.len(), 1); let expected_parquet_file = to_file(tfile); - assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + assert_eq!(cached_files[0].as_ref(), &expected_parquet_file); // validate a second request doens't result in a catalog request assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); @@ -274,12 +264,12 @@ mod tests { let cached_files = cache.get(table1.table.id).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = to_file(tfile1); - assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + assert_eq!(cached_files[0].as_ref(), &expected_parquet_file); let cached_files = cache.get(table2.table.id).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = to_file(tfile2); - assert_eq!(cached_files[0].parquet_file, expected_parquet_file); + assert_eq!(cached_files[0].as_ref(), &expected_parquet_file); } #[tokio::test] @@ -301,24 +291,19 @@ mod tests { partition.create_parquet_file("table1 foo=1 11").await; let table_id = table.table.id; - // expect these sizes change with sizes of parquet and - // its metadata (as the metadata is thrift encoded and - // includes timestamps, etc) - let slop_budget = 10; - - let single_file_size = 1208; - let two_file_size = 2383; + let single_file_size = 247; + let two_file_size = 462; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); let cached_files = cache.get(table_id).await; - assert_close!(cached_files.size(), single_file_size, slop_budget); + assert_eq!(cached_files.size(), single_file_size); // add a second file, and force the cache to find it partition.create_parquet_file("table1 foo=1 11").await; cache.expire(table_id); let cached_files = cache.get(table_id).await; - assert_close!(cached_files.size(), two_file_size, slop_budget); + assert_eq!(cached_files.size(), two_file_size); } #[tokio::test] @@ -452,7 +437,7 @@ mod tests { impl ParquetIds for &CachedParquetFiles { fn ids(&self) -> HashSet { - self.files.iter().map(|f| f.parquet_file.id).collect() + self.files.iter().map(|f| f.id).collect() } } diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index d47329a897..aeead61612 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -106,7 +106,7 @@ impl Reconciler { let parquet_files = filter_parquet_files(ingester_partitions, parquet_files.vec())?; debug!( - parquet_ids=?parquet_files.iter().map(|f| f.parquet_file.id).collect::>(), + parquet_ids=?parquet_files.iter().map(|f| f.id).collect::>(), namespace=%self.namespace_name(), table_name=%self.table_name(), "Parquet files after filtering" @@ -117,10 +117,7 @@ impl Reconciler { for cached_parquet_file in parquet_files { if let Some(chunk) = self .chunk_adapter - .new_rb_chunk( - Arc::clone(&self.namespace_name), - Arc::new(cached_parquet_file.parquet_file.clone()), - ) + .new_rb_chunk(Arc::clone(&self.namespace_name), cached_parquet_file) .await { chunks_from_parquet.push(chunk); diff --git a/querier/src/table/state_reconciler/interface.rs b/querier/src/table/state_reconciler/interface.rs index fe763c25a5..e7ee9cddb9 100644 --- a/querier/src/table/state_reconciler/interface.rs +++ b/querier/src/table/state_reconciler/interface.rs @@ -1,8 +1,7 @@ //! Interface for reconciling Ingester and catalog state use crate::ingester::IngesterPartition; -use data_types::{PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId}; -use parquet_file::chunk::DecodedParquetFile; +use data_types::{ParquetFile, PartitionId, SequenceNumber, SequencerId, Tombstone, TombstoneId}; use std::{ops::Deref, sync::Arc}; /// Information about an ingester partition. @@ -56,24 +55,24 @@ where /// Information about a parquet file. /// -/// This is mostly the same as [`DecodedParquetFile`] but allows easier mocking. +/// This is mostly the same as [`ParquetFile`] but allows easier mocking. pub trait ParquetFileInfo { fn partition_id(&self) -> PartitionId; fn min_sequence_number(&self) -> SequenceNumber; fn max_sequence_number(&self) -> SequenceNumber; } -impl ParquetFileInfo for Arc { +impl ParquetFileInfo for Arc { fn partition_id(&self) -> PartitionId { - self.parquet_file.partition_id + self.partition_id } fn min_sequence_number(&self) -> SequenceNumber { - self.parquet_file.min_sequence_number + self.min_sequence_number } fn max_sequence_number(&self) -> SequenceNumber { - self.parquet_file.max_sequence_number + self.max_sequence_number } }