refactor: remove InputReaderAdapter

pull/24376/head
alamb 2020-06-23 17:15:02 -04:00
parent 68ce351a3a
commit b22423621b
6 changed files with 41 additions and 72 deletions

View File

@ -2,55 +2,13 @@
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![warn(missing_debug_implementations, clippy::explicit_iter_loop)] #![warn(missing_debug_implementations, clippy::explicit_iter_loop)]
// Export the parts of
pub use parquet::{ pub use parquet::{
errors::ParquetError, errors::ParquetError,
file::reader::{Length, TryClone}, file::reader::{Length, TryClone},
}; };
use std::io::{Read, Seek, SeekFrom};
pub mod error; pub mod error;
pub mod metadata; pub mod metadata;
pub mod stats; pub mod stats;
pub mod writer; pub mod writer;
/// Adapts an object that implements Read+Seek to something that also
/// implements the parquet TryClone interface, required by the parquet
/// reader. This is provided so users of this crate do not have to
/// implement a parquet specific trait
struct InputReaderAdapter<R>
where
R: Read + Seek,
{
real_reader: R,
size: u64,
}
impl<R: Read + Seek> InputReaderAdapter<R> {
fn new(real_reader: R, size: u64) -> InputReaderAdapter<R> {
InputReaderAdapter { real_reader, size }
}
}
impl<R: Read + Seek> Read for InputReaderAdapter<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.real_reader.read(buf)
}
}
impl<R: Read + Seek> Seek for InputReaderAdapter<R> {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
self.real_reader.seek(pos)
}
}
impl<R: Read + Seek> TryClone for InputReaderAdapter<R> {
fn try_clone(&self) -> std::result::Result<Self, ParquetError> {
Err(ParquetError::NYI(String::from("TryClone for input reader")))
}
}
impl<R: Read + Seek> Length for InputReaderAdapter<R> {
fn len(&self) -> u64 {
self.size
}
}

View File

@ -10,7 +10,7 @@ use delorean_table_schema::DataType;
use crate::{ use crate::{
error::{Error, Result}, error::{Error, Result},
InputReaderAdapter, Length, TryClone,
}; };
pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String {
@ -36,14 +36,13 @@ pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataTy
/// Print parquet metadata that can be read from `input`, with a total /// Print parquet metadata that can be read from `input`, with a total
/// size of `input_size` byes /// size of `input_size` byes
pub fn print_parquet_metadata<R: 'static>(input: R, input_size: u64) -> Result<()> pub fn print_parquet_metadata<R: 'static>(input: R) -> Result<()>
where where
R: Read + Seek, R: Read + Seek + TryClone + Length,
{ {
let input_adapter = InputReaderAdapter::new(input, input_size); let input_len = input.len();
let reader = let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError {
SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError {
message: String::from("Creating parquet reader"), message: String::from("Creating parquet reader"),
source: e, source: e,
})?; })?;
@ -52,7 +51,7 @@ where
let file_metadata = parquet_metadata.file_metadata(); let file_metadata = parquet_metadata.file_metadata();
let num_columns = file_metadata.schema_descr().num_columns(); let num_columns = file_metadata.schema_descr().num_columns();
println!("Parquet file size: {} bytes", input_size); println!("Parquet file size: {} bytes", input_len);
println!( println!(
"Parquet file Schema: {}", "Parquet file Schema: {}",
parquet_schema_as_string(file_metadata.schema()).trim_end() parquet_schema_as_string(file_metadata.schema()).trim_end()

View File

@ -12,21 +12,18 @@ use delorean_table::stats::{ColumnStats, ColumnStatsBuilder};
use crate::{ use crate::{
error::{Error, Result}, error::{Error, Result},
metadata::data_type_from_parquet_type, metadata::data_type_from_parquet_type,
InputReaderAdapter, Length, TryClone,
}; };
/// Calculate storage statistics for a particular parquet file that can /// Calculate storage statistics for a particular parquet file that can
/// be read from `input`, with a total size of `input_size` byes /// be read from `input`, with a total size of `input_size` byes
/// ///
/// Returns a Vec of ColumnStats, one for each column in the input /// Returns a Vec of ColumnStats, one for each column in the input
pub fn col_stats<R: 'static>(input: R, input_size: u64) -> Result<Vec<ColumnStats>> pub fn col_stats<R: 'static>(input: R) -> Result<Vec<ColumnStats>>
where where
R: Read + Seek, R: Read + Seek + TryClone + Length,
{ {
let input_adapter = InputReaderAdapter::new(input, input_size); let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError {
let reader =
SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError {
message: String::from("Creating parquet reader"), message: String::from("Creating parquet reader"),
source: e, source: e,
})?; })?;

View File

@ -40,8 +40,7 @@ pub fn dump_meta(input_filename: &str) -> Result<()> {
Ok(()) Ok(())
} }
FileType::Parquet => { FileType::Parquet => {
let input_len = input_reader.len(); print_parquet_metadata(input_reader)
print_parquet_metadata(input_reader, input_len)
.map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?; .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?;
Ok(()) Ok(())
} }

View File

@ -6,6 +6,7 @@ use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom};
use std::path::Path; use std::path::Path;
use crate::commands::error::{Error, Result}; use crate::commands::error::{Error, Result};
use delorean_parquet::ParquetError;
#[derive(Debug)] #[derive(Debug)]
pub enum FileType { pub enum FileType {
@ -82,14 +83,18 @@ impl Seek for InputReader {
} }
} }
impl Read for InputReader { impl delorean_parquet::Length for InputReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn len(&self) -> u64 {
match self { match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), InputReader::FileInputType(file_input_reader) => file_input_reader.file_size,
InputReader::MemoryInputType(memory_input_reader) => { InputReader::MemoryInputType(memory_input_reader) => memory_input_reader.file_size,
memory_input_reader.cursor.read(buf)
} }
} }
}
impl delorean_parquet::TryClone for InputReader {
fn try_clone(&self) -> std::result::Result<Self, ParquetError> {
Err(ParquetError::NYI(String::from("TryClone for input reader")))
} }
} }
@ -112,6 +117,17 @@ impl BufRead for InputReader {
} }
} }
impl Read for InputReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
match self {
InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf),
InputReader::MemoryInputType(memory_input_reader) => {
memory_input_reader.cursor.read(buf)
}
}
}
}
impl InputReader { impl InputReader {
pub fn file_type(&self) -> &FileType { pub fn file_type(&self) -> &FileType {
match self { match self {

View File

@ -30,7 +30,7 @@ pub fn stats(input_filename: &str) -> Result<()> {
let input_len = input_reader.len(); let input_len = input_reader.len();
( (
input_len, input_len,
col_stats(input_reader, input_len) col_stats(input_reader)
.map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?, .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?,
) )
} }