refactor: Add more methods to DecodedParquetFile
I'm tired of trying to remember which info is on which metadata.pull/24376/head
parent
9e30a3eb29
commit
054c25de50
|
@ -6,11 +6,12 @@ use crate::{
|
|||
storage::ParquetStorage,
|
||||
};
|
||||
use data_types::{
|
||||
ParquetFile, ParquetFileWithMetadata, TableSummary, TimestampMinMax, TimestampRange,
|
||||
ParquetFile, ParquetFileId, ParquetFileWithMetadata, PartitionId, SequenceNumber, SequencerId,
|
||||
TableId, TableSummary, TimestampMinMax, TimestampRange,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use predicate::Predicate;
|
||||
use schema::{selection::Selection, Schema};
|
||||
use schema::{selection::Selection, sort::SortKey, Schema};
|
||||
use std::{collections::BTreeSet, mem, sync::Arc};
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -214,6 +215,46 @@ impl DecodedParquetFile {
|
|||
}
|
||||
}
|
||||
|
||||
/// The IOx schema from the decoded IOx parquet metadata
|
||||
pub fn schema(&self) -> Arc<Schema> {
|
||||
self.decoded_metadata.read_schema().unwrap()
|
||||
}
|
||||
|
||||
/// The IOx parquet file ID
|
||||
pub fn parquet_file_id(&self) -> ParquetFileId {
|
||||
self.parquet_file.id
|
||||
}
|
||||
|
||||
/// The IOx partition ID
|
||||
pub fn partition_id(&self) -> PartitionId {
|
||||
self.parquet_file.partition_id
|
||||
}
|
||||
|
||||
/// The IOx sequencer ID
|
||||
pub fn sequencer_id(&self) -> SequencerId {
|
||||
self.iox_metadata.sequencer_id
|
||||
}
|
||||
|
||||
/// The IOx table ID
|
||||
pub fn table_id(&self) -> TableId {
|
||||
self.parquet_file.table_id
|
||||
}
|
||||
|
||||
/// The sort key from the IOx metadata
|
||||
pub fn sort_key(&self) -> Option<&SortKey> {
|
||||
self.iox_metadata.sort_key.as_ref()
|
||||
}
|
||||
|
||||
/// The minimum sequence number in this file
|
||||
pub fn min_sequence_number(&self) -> SequenceNumber {
|
||||
self.parquet_file.min_sequence_number
|
||||
}
|
||||
|
||||
/// The maximum sequence number in this file
|
||||
pub fn max_sequence_number(&self) -> SequenceNumber {
|
||||
self.parquet_file.max_sequence_number
|
||||
}
|
||||
|
||||
/// Estimate the memory consumption of this object and its contents
|
||||
pub fn size(&self) -> usize {
|
||||
// note substract size of non Arc'd members as they are
|
||||
|
|
|
@ -328,42 +328,38 @@ impl ChunkAdapter {
|
|||
&self,
|
||||
decoded_parquet_file: &DecodedParquetFile,
|
||||
) -> Option<QuerierParquetChunk> {
|
||||
let parquet_file = &decoded_parquet_file.parquet_file;
|
||||
let chunk = Arc::new(self.new_parquet_chunk(decoded_parquet_file).await?);
|
||||
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
|
||||
let table_name = self
|
||||
.catalog_cache
|
||||
.table()
|
||||
.name(parquet_file.table_id)
|
||||
.await?;
|
||||
let parquet_file_id = decoded_parquet_file.parquet_file_id();
|
||||
let table_id = decoded_parquet_file.table_id();
|
||||
|
||||
let iox_metadata = &decoded_parquet_file.iox_metadata;
|
||||
let chunk = Arc::new(self.new_parquet_chunk(decoded_parquet_file).await?);
|
||||
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file_id.get() as _));
|
||||
let table_name = self.catalog_cache.table().name(table_id).await?;
|
||||
|
||||
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and
|
||||
// hope it doesn't overflow u32. Order is non-zero, se we need to add 1.
|
||||
let order = ChunkOrder::new(1 + iox_metadata.min_sequence_number.get() as u32)
|
||||
let order = ChunkOrder::new(1 + decoded_parquet_file.min_sequence_number().get() as u32)
|
||||
.expect("cannot be zero");
|
||||
|
||||
// Read partition sort key
|
||||
let partition_sort_key = self
|
||||
.catalog_cache()
|
||||
.partition()
|
||||
.sort_key(iox_metadata.partition_id)
|
||||
.sort_key(decoded_parquet_file.partition_id())
|
||||
.await;
|
||||
|
||||
let meta = Arc::new(ChunkMeta {
|
||||
chunk_id,
|
||||
table_name,
|
||||
order,
|
||||
sort_key: iox_metadata.sort_key.clone(),
|
||||
sequencer_id: iox_metadata.sequencer_id,
|
||||
partition_id: iox_metadata.partition_id,
|
||||
min_sequence_number: parquet_file.min_sequence_number,
|
||||
max_sequence_number: parquet_file.max_sequence_number,
|
||||
sort_key: decoded_parquet_file.sort_key().cloned(),
|
||||
sequencer_id: decoded_parquet_file.sequencer_id(),
|
||||
partition_id: decoded_parquet_file.partition_id(),
|
||||
min_sequence_number: decoded_parquet_file.min_sequence_number(),
|
||||
max_sequence_number: decoded_parquet_file.max_sequence_number(),
|
||||
});
|
||||
|
||||
Some(QuerierParquetChunk::new(
|
||||
parquet_file.id,
|
||||
parquet_file_id,
|
||||
chunk,
|
||||
meta,
|
||||
partition_sort_key,
|
||||
|
@ -375,7 +371,14 @@ impl ChunkAdapter {
|
|||
&self,
|
||||
decoded_parquet_file: &DecodedParquetFile,
|
||||
) -> Option<QuerierRBChunk> {
|
||||
let parquet_file = &decoded_parquet_file.parquet_file;
|
||||
let parquet_file_id = decoded_parquet_file.parquet_file_id();
|
||||
let schema = decoded_parquet_file.schema();
|
||||
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file_id.get() as _));
|
||||
let table_name = self
|
||||
.catalog_cache
|
||||
.table()
|
||||
.name(decoded_parquet_file.table_id())
|
||||
.await?;
|
||||
|
||||
let rb_chunk = self
|
||||
.catalog_cache()
|
||||
|
@ -383,47 +386,31 @@ impl ChunkAdapter {
|
|||
.get(decoded_parquet_file)
|
||||
.await;
|
||||
|
||||
let decoded = decoded_parquet_file
|
||||
.parquet_metadata
|
||||
.as_ref()
|
||||
.decode()
|
||||
.unwrap();
|
||||
let schema = decoded.read_schema().unwrap();
|
||||
|
||||
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _));
|
||||
let table_name = self
|
||||
.catalog_cache
|
||||
.table()
|
||||
.name(parquet_file.table_id)
|
||||
.await?;
|
||||
|
||||
let iox_metadata = &decoded_parquet_file.iox_metadata;
|
||||
|
||||
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and
|
||||
// hope it doesn't overflow u32. Order is non-zero, se we need to add 1.
|
||||
let order = ChunkOrder::new(1 + iox_metadata.min_sequence_number.get() as u32)
|
||||
let order = ChunkOrder::new(1 + decoded_parquet_file.min_sequence_number().get() as u32)
|
||||
.expect("cannot be zero");
|
||||
|
||||
// Read partition sort key
|
||||
let partition_sort_key = self
|
||||
.catalog_cache()
|
||||
.partition()
|
||||
.sort_key(iox_metadata.partition_id)
|
||||
.sort_key(decoded_parquet_file.partition_id())
|
||||
.await;
|
||||
|
||||
let meta = Arc::new(ChunkMeta {
|
||||
chunk_id,
|
||||
table_name,
|
||||
order,
|
||||
sort_key: iox_metadata.sort_key.clone(),
|
||||
sequencer_id: iox_metadata.sequencer_id,
|
||||
partition_id: iox_metadata.partition_id,
|
||||
min_sequence_number: parquet_file.min_sequence_number,
|
||||
max_sequence_number: parquet_file.max_sequence_number,
|
||||
sort_key: decoded_parquet_file.sort_key().cloned(),
|
||||
sequencer_id: decoded_parquet_file.sequencer_id(),
|
||||
partition_id: decoded_parquet_file.partition_id(),
|
||||
min_sequence_number: decoded_parquet_file.min_sequence_number(),
|
||||
max_sequence_number: decoded_parquet_file.max_sequence_number(),
|
||||
});
|
||||
|
||||
Some(QuerierRBChunk::new(
|
||||
parquet_file.id,
|
||||
parquet_file_id,
|
||||
rb_chunk,
|
||||
meta,
|
||||
schema,
|
||||
|
|
Loading…
Reference in New Issue