refactor: move `read_schema_from_parquet_metadata` back to `parquet_file::metadata`

Let us pool all metadata handling in a single module, which makes it
easier to review.
pull/24376/head
Marco Neumann 2021-05-19 18:16:21 +02:00
parent ac83d99f66
commit fe8e6301fe
5 changed files with 49 additions and 48 deletions

View File

@ -140,7 +140,7 @@ pub enum Error {
visibility(pub)
)]
SchemaReadFailed {
source: crate::storage::Error,
source: crate::metadata::Error,
path: DirsAndFileName,
},
@ -1059,8 +1059,10 @@ pub mod tests {
use std::{num::NonZeroU32, ops::Deref};
use crate::{
metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata},
storage::read_schema_from_parquet_metadata,
metadata::{
read_parquet_metadata_from_file, read_schema_from_parquet_metadata,
read_statistics_from_parquet_metadata,
},
utils::{load_parquet_from_store, make_chunk, make_object_store},
};
use object_store::parsed_path;

View File

@ -86,13 +86,14 @@
//! [Apache Parquet]: https://parquet.apache.org/
//! [Apache Thrift]: https://thrift.apache.org/
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use std::sync::Arc;
use std::{convert::TryInto, sync::Arc};
use data_types::partition_metadata::{
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary,
};
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use parquet::{
arrow::parquet_to_arrow_schema,
file::{
metadata::{
FileMetaData as ParquetFileMetaData, ParquetMetaData,
@ -177,6 +178,16 @@ pub enum Error {
column: String,
source: parquet::errors::ParquetError,
},
#[snafu(display("Cannot read arrow schema from parquet: {}", source))]
ArrowFromParquetFailure {
source: parquet::errors::ParquetError,
},
#[snafu(display("Cannot read IOx schema from arrow: {}", source))]
IoxFromArrowFailure {
source: internal_types::schema::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -187,6 +198,24 @@ pub fn read_parquet_metadata_from_file(data: Vec<u8>) -> Result<ParquetMetaData>
Ok(reader.metadata().clone())
}
/// Read IOx schema from parquet metadata.
pub fn read_schema_from_parquet_metadata(parquet_md: &ParquetMetaData) -> Result<Schema> {
let file_metadata = parquet_md.file_metadata();
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.context(ArrowFromParquetFailure {})?;
let arrow_schema_ref = Arc::new(arrow_schema);
let schema: Schema = arrow_schema_ref
.try_into()
.context(IoxFromArrowFailure {})?;
Ok(schema)
}
/// Read IOx statistics (including timestamp range) from parquet metadata.
pub fn read_statistics_from_parquet_metadata(
parquet_md: &ParquetMetaData,
@ -444,9 +473,8 @@ mod tests {
use internal_types::{schema::TIME_COLUMN_NAME, selection::Selection};
use crate::{
storage::read_schema_from_parquet_metadata,
utils::{load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store},
use crate::utils::{
load_parquet_from_store, make_chunk, make_chunk_no_row_group, make_object_store,
};
#[tokio::test]

View File

@ -9,16 +9,14 @@ use datafusion::{
physical_optimizer::pruning::PruningPredicateBuilder,
physical_plan::{common::SizedRecordBatchStream, RecordBatchStream, SendableRecordBatchStream},
};
use internal_types::{schema::Schema, selection::Selection};
use internal_types::selection::Selection;
use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi,
};
use parquet::{
self,
arrow::{
arrow_reader::ParquetFileArrowReader, parquet_to_arrow_schema, ArrowReader, ArrowWriter,
},
arrow::{arrow_reader::ParquetFileArrowReader, ArrowReader, ArrowWriter},
file::{
metadata::{ParquetMetaData, RowGroupMetaData},
reader::FileReader,
@ -34,14 +32,13 @@ use futures::{Stream, StreamExt, TryStreamExt};
use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
convert::TryInto,
io::{Cursor, Seek, SeekFrom, Write},
sync::Arc,
task::{Context, Poll},
};
use tokio_stream::wrappers::ReceiverStream;
use crate::metadata::read_parquet_metadata_from_file;
use crate::metadata::{read_parquet_metadata_from_file, read_schema_from_parquet_metadata};
#[derive(Debug, Snafu)]
pub enum Error {
@ -101,16 +98,6 @@ pub enum Error {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Cannot read arrow schema from parquet: {}", source))]
ArrowFromParquetFailure {
source: parquet::errors::ParquetError,
},
#[snafu(display("Cannot read IOx schema from arrow: {}", source))]
IoxFromArrowFailure {
source: internal_types::schema::Error,
},
#[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))]
ExtractingMetadataFailure { source: crate::metadata::Error },
@ -377,7 +364,8 @@ impl Storage {
// TODO: remove these line after https://github.com/apache/arrow-rs/issues/252 is done
// Get file level metadata to set it to the record batch's metadata below
let metadata = reader.metadata();
let schema = read_schema_from_parquet_metadata(metadata)?;
let schema =
read_schema_from_parquet_metadata(metadata).context(ExtractingMetadataFailure)?;
if let Some(predicate_builder) = predicate_builder {
let predicate_values = predicate_builder
@ -489,24 +477,6 @@ impl TryClone for MemWriter {
}
}
/// Read IOx schema from parquet metadata.
pub fn read_schema_from_parquet_metadata(parquet_md: &ParquetMetaData) -> Result<Schema> {
let file_metadata = parquet_md.file_metadata();
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.context(ArrowFromParquetFailure {})?;
let arrow_schema_ref = Arc::new(arrow_schema);
let schema: Schema = arrow_schema_ref
.try_into()
.context(IoxFromArrowFailure {})?;
Ok(schema)
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;

View File

@ -10,8 +10,10 @@ mod tests {
use data_types::partition_metadata::TableSummary;
use crate::{
metadata::{read_parquet_metadata_from_file, read_statistics_from_parquet_metadata},
storage::read_schema_from_parquet_metadata,
metadata::{
read_parquet_metadata_from_file, read_schema_from_parquet_metadata,
read_statistics_from_parquet_metadata,
},
utils::*,
};

View File

@ -33,8 +33,8 @@ use parking_lot::{Mutex, RwLock};
use parquet_file::{
catalog::{CatalogParquetInfo, CatalogState, PreservedCatalog},
chunk::{Chunk as ParquetChunk, ChunkMetrics as ParquetChunkMetrics},
metadata::read_statistics_from_parquet_metadata,
storage::{read_schema_from_parquet_metadata, Storage},
metadata::{read_schema_from_parquet_metadata, read_statistics_from_parquet_metadata},
storage::Storage,
};
use query::predicate::{Predicate, PredicateBuilder};
use query::{exec::Executor, Database, DEFAULT_SCHEMA};
@ -1267,8 +1267,7 @@ mod tests {
ObjectStore, ObjectStoreApi,
};
use parquet_file::{
metadata::read_parquet_metadata_from_file,
storage::read_schema_from_parquet_metadata,
metadata::{read_parquet_metadata_from_file, read_schema_from_parquet_metadata},
utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
};
use query::{frontend::sql::SqlQueryPlanner, PartitionChunk};