refactor: move parquet chunk's new and decode to parquet_file crate (#3987)

pull/24376/head
Nga Tran 2022-03-08 17:04:32 -05:00 committed by GitHub
parent 96100635c3
commit c6cab3538f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 48 deletions

View File

@ -1,8 +1,12 @@
use crate::{metadata::IoxParquetMetaData, storage::Storage};
use crate::{
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
storage::Storage,
};
use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::{TimestampMinMax, TimestampRange},
};
use data_types2::ParquetFile;
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use predicate::Predicate;
@ -286,3 +290,62 @@ fn extract_range(table_summary: &TableSummary) -> Option<TimestampMinMax> {
None
})
}
// Parquet file with decoded metadata.
#[derive(Debug)]
pub struct DecodedParquetFile {
pub parquet_file: ParquetFile,
pub parquet_metadata: Arc<IoxParquetMetaData>,
pub decoded_metadata: DecodedIoxParquetMetaData,
pub iox_metadata: IoxMetadata,
}
impl DecodedParquetFile {
pub fn new(parquet_file: ParquetFile) -> Self {
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
parquet_file.parquet_metadata.clone(),
));
let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken");
let iox_metadata = decoded_metadata
.read_iox_metadata_new()
.expect("cannot read IOx metadata from parquet MD");
Self {
parquet_file,
parquet_metadata,
decoded_metadata,
iox_metadata,
}
}
}
/// Create parquet chunk.
pub fn new_parquet_chunk(
decoded_parquet_file: &DecodedParquetFile,
table_name: Arc<str>,
partition_key: Arc<str>, // old partition key format
metrics: ChunkMetrics,
iox_object_store: Arc<IoxObjectStore>,
) -> ParquetChunk {
let iox_metadata = &decoded_parquet_file.iox_metadata;
let path = ParquetFilePath::new_new_gen(
iox_metadata.namespace_id,
iox_metadata.table_id,
iox_metadata.sequencer_id,
iox_metadata.partition_id,
iox_metadata.object_store_id,
);
let parquet_file = &decoded_parquet_file.parquet_file;
let file_size_bytes = parquet_file.file_size_bytes as usize;
ParquetChunk::new(
&path,
iox_object_store,
file_size_bytes,
Arc::clone(&decoded_parquet_file.parquet_metadata),
table_name,
partition_key,
metrics,
)
.expect("cannot create chunk")
}

View File

@ -1,43 +1,15 @@
use crate::cache::CatalogCache;
use data_types2::{ChunkAddr, ChunkId, ChunkOrder, ParquetFile};
use db::catalog::chunk::{CatalogChunk, ChunkMetadata, ChunkMetrics as CatalogChunkMetrics};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use iox_object_store::IoxObjectStore;
use object_store::ObjectStore;
use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
use parquet_file::chunk::{
new_parquet_chunk, ChunkMetrics as ParquetChunkMetrics, DecodedParquetFile, ParquetChunk,
};
use std::sync::Arc;
use time::TimeProvider;
use uuid::Uuid;
/// Parquet file with decoded metadata.
struct DecodedParquetFile {
parquet_file: ParquetFile,
parquet_metadata: Arc<IoxParquetMetaData>,
decoded_metadata: DecodedIoxParquetMetaData,
iox_metadata: IoxMetadata,
}
impl DecodedParquetFile {
fn new(parquet_file: ParquetFile) -> Self {
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(
parquet_file.parquet_metadata.clone(),
));
let decoded_metadata = parquet_metadata.decode().expect("parquet metadata broken");
let iox_metadata = decoded_metadata
.read_iox_metadata_new()
.expect("cannot read IOx metadata from parquet MD");
Self {
parquet_file,
parquet_metadata,
decoded_metadata,
iox_metadata,
}
}
}
/// Adapter that can create old-gen chunks for the new-gen catalog.
#[derive(Debug)]
pub struct ParquetChunkAdapter {
@ -78,17 +50,7 @@ impl ParquetChunkAdapter {
/// Create parquet chunk.
async fn new_parquet_chunk(&self, decoded_parquet_file: &DecodedParquetFile) -> ParquetChunk {
let iox_metadata = &decoded_parquet_file.iox_metadata;
let path = ParquetFilePath::new_new_gen(
iox_metadata.namespace_id,
iox_metadata.table_id,
iox_metadata.sequencer_id,
iox_metadata.partition_id,
iox_metadata.object_store_id,
);
let parquet_file = &decoded_parquet_file.parquet_file;
let file_size_bytes = parquet_file.file_size_bytes as usize;
let table_name = self.catalog_cache.table_name(parquet_file.table_id).await;
let partition_key = self
.catalog_cache
@ -96,16 +58,13 @@ impl ParquetChunkAdapter {
.await;
let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref());
ParquetChunk::new(
&path,
Arc::clone(&self.iox_object_store),
file_size_bytes,
Arc::clone(&decoded_parquet_file.parquet_metadata),
new_parquet_chunk(
decoded_parquet_file,
table_name,
partition_key,
metrics,
Arc::clone(&self.iox_object_store),
)
.expect("cannot create chunk")
}
/// Create all components to create a catalog chunk using