diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index a578fb2ff9..c7c0c783ee 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -8,7 +8,7 @@ use data_types::{ use object_store::DynObjectStore; use observability_deps::tracing::*; use parquet_file::{ - chunk::{new_parquet_chunk, ChunkMetrics, DecodedParquetFile}, + chunk::{ChunkMetrics, DecodedParquetFile, ParquetChunk}, metadata::{IoxMetadata, IoxParquetMetaData}, }; use schema::sort::SortKey; @@ -94,7 +94,7 @@ impl ParquetFileWithTombstone { ) -> QueryableParquetChunk { let decoded_parquet_file = DecodedParquetFile::new((*self.data).clone()); - let parquet_chunk = new_parquet_chunk( + let parquet_chunk = ParquetChunk::new( &decoded_parquet_file, ChunkMetrics::new_unregistered(), // TODO: need to add metrics object_store, diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index c06f70f5f4..68a7ebcc54 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -107,61 +107,41 @@ pub struct ParquetChunk { } impl ParquetChunk { - /// Creates new chunk from given parquet metadata. + /// Create parquet chunk. pub fn new( - path: &ParquetFilePath, - object_store: Arc, - file_size_bytes: usize, - parquet_metadata: Arc, + decoded_parquet_file: &DecodedParquetFile, metrics: ChunkMetrics, - ) -> Result { - let decoded = parquet_metadata + object_store: Arc, + ) -> ParquetChunk { + let iox_metadata = &decoded_parquet_file.iox_metadata; + let path = ParquetFilePath::new( + iox_metadata.namespace_id, + iox_metadata.table_id, + iox_metadata.sequencer_id, + iox_metadata.partition_id, + iox_metadata.object_store_id, + ); + + let decoded = decoded_parquet_file + .parquet_metadata + .as_ref() .decode() - .context(MetadataDecodeFailedSnafu { path })?; - let schema = decoded - .read_schema() - .context(SchemaReadFailedSnafu { path })?; - let columns = decoded - .read_statistics(&schema) - .context(StatisticsReadFailedSnafu { path })?; + .unwrap(); + let schema = decoded.read_schema().unwrap(); + let columns = decoded.read_statistics(&schema).unwrap(); let table_summary = TableSummary { columns }; let rows = decoded.row_count(); - - Ok(Self::new_from_parts( - Arc::new(table_summary), - schema, - path, - object_store, - file_size_bytes, - parquet_metadata, - rows, - metrics, - )) - } - - /// Creates a new chunk from given parts w/o parsing anything from the provided parquet - /// metadata. - #[allow(clippy::too_many_arguments)] - pub(crate) fn new_from_parts( - table_summary: Arc, - schema: Arc, - path: &ParquetFilePath, - object_store: Arc, - file_size_bytes: usize, - parquet_metadata: Arc, - rows: usize, - metrics: ChunkMetrics, - ) -> Self { let timestamp_min_max = extract_range(&table_summary); + let file_size_bytes = decoded_parquet_file.parquet_file.file_size_bytes as usize; Self { - table_summary, + table_summary: Arc::new(table_summary), schema, timestamp_min_max, - path: path.into(), object_store, + path, file_size_bytes, - parquet_metadata, + parquet_metadata: Arc::clone(&decoded_parquet_file.parquet_metadata), rows, metrics, } @@ -299,31 +279,3 @@ impl DecodedParquetFile { } } } - -/// Create parquet chunk. -pub fn new_parquet_chunk( - decoded_parquet_file: &DecodedParquetFile, - metrics: ChunkMetrics, - object_store: Arc, -) -> ParquetChunk { - let iox_metadata = &decoded_parquet_file.iox_metadata; - let path = ParquetFilePath::new( - iox_metadata.namespace_id, - iox_metadata.table_id, - iox_metadata.sequencer_id, - iox_metadata.partition_id, - iox_metadata.object_store_id, - ); - - let parquet_file = &decoded_parquet_file.parquet_file; - let file_size_bytes = parquet_file.file_size_bytes as usize; - - ParquetChunk::new( - &path, - object_store, - file_size_bytes, - Arc::clone(&decoded_parquet_file.parquet_metadata), - metrics, - ) - .expect("cannot create chunk") -} diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index fb8ba72010..1b1ea4daad 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -11,9 +11,7 @@ use iox_catalog::interface::Catalog; use iox_query::{exec::IOxSessionContext, QueryChunk}; use iox_time::TimeProvider; use object_store::DynObjectStore; -use parquet_file::chunk::{ - new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk, -}; +use parquet_file::chunk::{ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk}; use schema::{selection::Selection, sort::SortKey}; use std::sync::Arc; use uuid::Uuid; @@ -217,7 +215,7 @@ impl ParquetChunkAdapter { ) -> Option { let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref()); - Some(new_parquet_chunk( + Some(ParquetChunk::new( decoded_parquet_file, metrics, Arc::clone(&self.object_store),