From b22423621ba2ada95facc8764ff18f8d23d38e48 Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 23 Jun 2020 17:15:02 -0400 Subject: [PATCH] refactor: remove InputReaderAdapter --- delorean_parquet/src/lib.rs | 44 +------------------------------- delorean_parquet/src/metadata.rs | 19 +++++++------- delorean_parquet/src/stats.rs | 17 +++++------- src/commands/file_meta.rs | 3 +-- src/commands/input.rs | 28 +++++++++++++++----- src/commands/stats.rs | 2 +- 6 files changed, 41 insertions(+), 72 deletions(-) diff --git a/delorean_parquet/src/lib.rs b/delorean_parquet/src/lib.rs index b60c1712a2..99632d3ecb 100644 --- a/delorean_parquet/src/lib.rs +++ b/delorean_parquet/src/lib.rs @@ -2,55 +2,13 @@ #![deny(rust_2018_idioms)] #![warn(missing_debug_implementations, clippy::explicit_iter_loop)] +// Export the parts of pub use parquet::{ errors::ParquetError, file::reader::{Length, TryClone}, }; -use std::io::{Read, Seek, SeekFrom}; pub mod error; pub mod metadata; pub mod stats; pub mod writer; - -/// Adapts an object that implements Read+Seek to something that also -/// implements the parquet TryClone interface, required by the parquet -/// reader. This is provided so users of this crate do not have to -/// implement a parquet specific trait -struct InputReaderAdapter -where - R: Read + Seek, -{ - real_reader: R, - size: u64, -} - -impl InputReaderAdapter { - fn new(real_reader: R, size: u64) -> InputReaderAdapter { - InputReaderAdapter { real_reader, size } - } -} - -impl Read for InputReaderAdapter { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.real_reader.read(buf) - } -} - -impl Seek for InputReaderAdapter { - fn seek(&mut self, pos: SeekFrom) -> Result { - self.real_reader.seek(pos) - } -} - -impl TryClone for InputReaderAdapter { - fn try_clone(&self) -> std::result::Result { - Err(ParquetError::NYI(String::from("TryClone for input reader"))) - } -} - -impl Length for InputReaderAdapter { - fn len(&self) -> u64 { - self.size - } -} diff --git a/delorean_parquet/src/metadata.rs b/delorean_parquet/src/metadata.rs index aa7d801c24..ae92b6764b 100644 --- a/delorean_parquet/src/metadata.rs +++ b/delorean_parquet/src/metadata.rs @@ -10,7 +10,7 @@ use delorean_table_schema::DataType; use crate::{ error::{Error, Result}, - InputReaderAdapter, + Length, TryClone, }; pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { @@ -36,23 +36,22 @@ pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataTy /// Print parquet metadata that can be read from `input`, with a total /// size of `input_size` byes -pub fn print_parquet_metadata(input: R, input_size: u64) -> Result<()> +pub fn print_parquet_metadata(input: R) -> Result<()> where - R: Read + Seek, + R: Read + Seek + TryClone + Length, { - let input_adapter = InputReaderAdapter::new(input, input_size); + let input_len = input.len(); - let reader = - SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError { - message: String::from("Creating parquet reader"), - source: e, - })?; + let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { + message: String::from("Creating parquet reader"), + source: e, + })?; let parquet_metadata = reader.metadata(); let file_metadata = parquet_metadata.file_metadata(); let num_columns = file_metadata.schema_descr().num_columns(); - println!("Parquet file size: {} bytes", input_size); + println!("Parquet file size: {} bytes", input_len); println!( "Parquet file Schema: {}", parquet_schema_as_string(file_metadata.schema()).trim_end() diff --git a/delorean_parquet/src/stats.rs b/delorean_parquet/src/stats.rs index bdc639ab94..ddf2350919 100644 --- a/delorean_parquet/src/stats.rs +++ b/delorean_parquet/src/stats.rs @@ -12,24 +12,21 @@ use delorean_table::stats::{ColumnStats, ColumnStatsBuilder}; use crate::{ error::{Error, Result}, metadata::data_type_from_parquet_type, - InputReaderAdapter, + Length, TryClone, }; /// Calculate storage statistics for a particular parquet file that can /// be read from `input`, with a total size of `input_size` byes /// /// Returns a Vec of ColumnStats, one for each column in the input -pub fn col_stats(input: R, input_size: u64) -> Result> +pub fn col_stats(input: R) -> Result> where - R: Read + Seek, + R: Read + Seek + TryClone + Length, { - let input_adapter = InputReaderAdapter::new(input, input_size); - - let reader = - SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError { - message: String::from("Creating parquet reader"), - source: e, - })?; + let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { + message: String::from("Creating parquet reader"), + source: e, + })?; let mut stats_builders = BTreeMap::new(); diff --git a/src/commands/file_meta.rs b/src/commands/file_meta.rs index 06506a7803..031cc02050 100644 --- a/src/commands/file_meta.rs +++ b/src/commands/file_meta.rs @@ -40,8 +40,7 @@ pub fn dump_meta(input_filename: &str) -> Result<()> { Ok(()) } FileType::Parquet => { - let input_len = input_reader.len(); - print_parquet_metadata(input_reader, input_len) + print_parquet_metadata(input_reader) .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?; Ok(()) } diff --git a/src/commands/input.rs b/src/commands/input.rs index 88dcd15b9f..1dc6fc5611 100644 --- a/src/commands/input.rs +++ b/src/commands/input.rs @@ -6,6 +6,7 @@ use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom}; use std::path::Path; use crate::commands::error::{Error, Result}; +use delorean_parquet::ParquetError; #[derive(Debug)] pub enum FileType { @@ -82,17 +83,21 @@ impl Seek for InputReader { } } -impl Read for InputReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { +impl delorean_parquet::Length for InputReader { + fn len(&self) -> u64 { match self { - InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), - InputReader::MemoryInputType(memory_input_reader) => { - memory_input_reader.cursor.read(buf) - } + InputReader::FileInputType(file_input_reader) => file_input_reader.file_size, + InputReader::MemoryInputType(memory_input_reader) => memory_input_reader.file_size, } } } +impl delorean_parquet::TryClone for InputReader { + fn try_clone(&self) -> std::result::Result { + Err(ParquetError::NYI(String::from("TryClone for input reader"))) + } +} + impl BufRead for InputReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { match self { @@ -112,6 +117,17 @@ impl BufRead for InputReader { } } +impl Read for InputReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.read(buf) + } + } + } +} + impl InputReader { pub fn file_type(&self) -> &FileType { match self { diff --git a/src/commands/stats.rs b/src/commands/stats.rs index 6d51f7f122..1f9b8b7b44 100644 --- a/src/commands/stats.rs +++ b/src/commands/stats.rs @@ -30,7 +30,7 @@ pub fn stats(input_filename: &str) -> Result<()> { let input_len = input_reader.len(); ( input_len, - col_stats(input_reader, input_len) + col_stats(input_reader) .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?, ) }