diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 7f2141aaba..c99bd715a3 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,8 +1,12 @@ -use crate::{metadata::IoxParquetMetaData, storage::Storage}; +use crate::{ + metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData}, + storage::Storage, +}; use data_types::{ partition_metadata::{Statistics, TableSummary}, timestamp::{TimestampMinMax, TimestampRange}, }; +use data_types2::ParquetFile; use datafusion::physical_plan::SendableRecordBatchStream; use iox_object_store::{IoxObjectStore, ParquetFilePath}; use predicate::Predicate; @@ -286,3 +290,62 @@ fn extract_range(table_summary: &TableSummary) -> Option { None }) } +// Parquet file with decoded metadata. +#[derive(Debug)] +pub struct DecodedParquetFile { + pub parquet_file: ParquetFile, + pub parquet_metadata: Arc, + pub decoded_metadata: DecodedIoxParquetMetaData, + pub iox_metadata: IoxMetadata, +} + +impl DecodedParquetFile { + pub fn new(parquet_file: ParquetFile) -> Self { + let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes( + parquet_file.parquet_metadata.clone(), + )); + let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken"); + let iox_metadata = decoded_metadata + .read_iox_metadata_new() + .expect("cannot read IOx metadata from parquet MD"); + + Self { + parquet_file, + parquet_metadata, + decoded_metadata, + iox_metadata, + } + } +} + +/// Create parquet chunk. +pub fn new_parquet_chunk( + decoded_parquet_file: &DecodedParquetFile, + table_name: Arc, + partition_key: Arc, // old partition key format + metrics: ChunkMetrics, + iox_object_store: Arc, +) -> ParquetChunk { + let iox_metadata = &decoded_parquet_file.iox_metadata; + let path = ParquetFilePath::new_new_gen( + iox_metadata.namespace_id, + iox_metadata.table_id, + iox_metadata.sequencer_id, + iox_metadata.partition_id, + iox_metadata.object_store_id, + ); + + let parquet_file = &decoded_parquet_file.parquet_file; + let file_size_bytes = parquet_file.file_size_bytes as usize; + + ParquetChunk::new( + &path, + iox_object_store, + file_size_bytes, + Arc::clone(&decoded_parquet_file.parquet_metadata), + table_name, + partition_key, + metrics, + ) + .expect("cannot create chunk") +} diff --git a/querier/src/chunk.rs b/querier/src/chunk.rs index cd6b056aff..4fdb2f91ce 100644 --- a/querier/src/chunk.rs +++ b/querier/src/chunk.rs @@ -1,43 +1,15 @@ use crate::cache::CatalogCache; use data_types2::{ChunkAddr, ChunkId, ChunkOrder, ParquetFile}; use db::catalog::chunk::{CatalogChunk, ChunkMetadata, ChunkMetrics as CatalogChunkMetrics}; -use iox_object_store::{IoxObjectStore, ParquetFilePath}; +use iox_object_store::IoxObjectStore; use object_store::ObjectStore; -use parquet_file::{ - chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, - metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData}, +use parquet_file::chunk::{ + new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk, }; use std::sync::Arc; use time::TimeProvider; use uuid::Uuid; -/// Parquet file with decoded metadata. -struct DecodedParquetFile { - parquet_file: ParquetFile, - parquet_metadata: Arc, - decoded_metadata: DecodedIoxParquetMetaData, - iox_metadata: IoxMetadata, -} - -impl DecodedParquetFile { - fn new(parquet_file: ParquetFile) -> Self { - let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes( - parquet_file.parquet_metadata.clone(), - )); - let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken"); - let iox_metadata = decoded_metadata - .read_iox_metadata_new() - .expect("cannot read IOx metadata from parquet MD"); - - Self { - parquet_file, - parquet_metadata, - decoded_metadata, - iox_metadata, - } - } -} - /// Adapter that can create old-gen chunks for the new-gen catalog. #[derive(Debug)] pub struct ParquetChunkAdapter { @@ -78,17 +50,7 @@ impl ParquetChunkAdapter { /// Create parquet chunk. async fn new_parquet_chunk(&self, decoded_parquet_file: &DecodedParquetFile) -> ParquetChunk { - let iox_metadata = &decoded_parquet_file.iox_metadata; - let path = ParquetFilePath::new_new_gen( - iox_metadata.namespace_id, - iox_metadata.table_id, - iox_metadata.sequencer_id, - iox_metadata.partition_id, - iox_metadata.object_store_id, - ); - let parquet_file = &decoded_parquet_file.parquet_file; - let file_size_bytes = parquet_file.file_size_bytes as usize; let table_name = self.catalog_cache.table_name(parquet_file.table_id).await; let partition_key = self .catalog_cache @@ -96,16 +58,13 @@ impl ParquetChunkAdapter { .await; let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref()); - ParquetChunk::new( - &path, - Arc::clone(&self.iox_object_store), - file_size_bytes, - Arc::clone(&decoded_parquet_file.parquet_metadata), + new_parquet_chunk( + decoded_parquet_file, table_name, partition_key, metrics, + Arc::clone(&self.iox_object_store), ) - .expect("cannot create chunk") } /// Create all components to create a catalog chunk using