From 68ce351a3a83794326583d005120101659a7a23c Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 23 Jun 2020 16:58:31 -0400 Subject: [PATCH 1/3] refactor: remove direct parquet dependency from delorean_ingest --- Cargo.lock | 1 - delorean_ingest/Cargo.toml | 7 ------- delorean_ingest/src/lib.rs | 8 ++++---- delorean_table/src/lib.rs | 1 + 4 files changed, 5 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23a58f1181..7bb88bb2c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,7 +687,6 @@ dependencies = [ "delorean_tsm", "env_logger", "log", - "parquet", "snafu", ] diff --git a/delorean_ingest/Cargo.toml b/delorean_ingest/Cargo.toml index 855a288665..bf797454f5 100644 --- a/delorean_ingest/Cargo.toml +++ b/delorean_ingest/Cargo.toml @@ -7,13 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# I get a build error when I use this one: -# failed to run custom build command for `arrow-flight v0.17.0` -#parquet = "0.17.0" -# this, we are living on the edge and pull directly from the arrow repo. -# https://github.com/apache/arrow/commit/04a1867eeb58f0c515e7ee5a6300a8f61045a6cd -parquet = { git = "https://github.com/apache/arrow.git", rev="04a1867eeb58f0c515e7ee5a6300a8f61045a6cd", version = "1.0.0-SNAPSHOT" } - snafu = "0.6.2" env_logger = "0.7.1" diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index 70ec239928..0b83c87188 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -8,12 +8,13 @@ use std::collections::{BTreeMap, BTreeSet}; use std::io::{BufRead, Seek}; use log::debug; -use parquet::data_type::ByteArray; use snafu::{ResultExt, Snafu}; use delorean_line_parser::{FieldValue, ParsedLine}; -use delorean_table::packers::{Packer, Packers}; -use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; +use delorean_table::{ + packers::{Packer, Packers}, + ByteArray, DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError, +}; use delorean_table_schema::{DataType, Schema, SchemaBuilder}; use delorean_tsm::mapper::{map_field_columns, ColumnData, TSMMeasurementMapper}; use delorean_tsm::reader::{TSMBlockReader, TSMIndexReader}; @@ -703,7 +704,6 @@ mod delorean_ingest_tests { use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; use delorean_table_schema::ColumnDefinition; use delorean_test_helpers::approximately_equal; - use parquet::data_type::ByteArray; use std::sync::{Arc, Mutex}; diff --git a/delorean_table/src/lib.rs b/delorean_table/src/lib.rs index 7fbee485aa..a03ecd027f 100644 --- a/delorean_table/src/lib.rs +++ b/delorean_table/src/lib.rs @@ -5,6 +5,7 @@ use snafu::Snafu; use delorean_table_schema::Schema; pub use packers::{Packer, Packers}; +pub use parquet::data_type::ByteArray; #[derive(Snafu, Debug)] pub enum Error { From b22423621ba2ada95facc8764ff18f8d23d38e48 Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 23 Jun 2020 17:15:02 -0400 Subject: [PATCH 2/3] refactor: remove InputReaderAdapter --- delorean_parquet/src/lib.rs | 44 +------------------------------- delorean_parquet/src/metadata.rs | 19 +++++++------- delorean_parquet/src/stats.rs | 17 +++++------- src/commands/file_meta.rs | 3 +-- src/commands/input.rs | 28 +++++++++++++++----- src/commands/stats.rs | 2 +- 6 files changed, 41 insertions(+), 72 deletions(-) diff --git a/delorean_parquet/src/lib.rs b/delorean_parquet/src/lib.rs index b60c1712a2..99632d3ecb 100644 --- a/delorean_parquet/src/lib.rs +++ b/delorean_parquet/src/lib.rs @@ -2,55 +2,13 @@ #![deny(rust_2018_idioms)] #![warn(missing_debug_implementations, clippy::explicit_iter_loop)] +// Export the parts of pub use parquet::{ errors::ParquetError, file::reader::{Length, TryClone}, }; -use std::io::{Read, Seek, SeekFrom}; pub mod error; pub mod metadata; pub mod stats; pub mod writer; - -/// Adapts an object that implements Read+Seek to something that also -/// implements the parquet TryClone interface, required by the parquet -/// reader. This is provided so users of this crate do not have to -/// implement a parquet specific trait -struct InputReaderAdapter -where - R: Read + Seek, -{ - real_reader: R, - size: u64, -} - -impl InputReaderAdapter { - fn new(real_reader: R, size: u64) -> InputReaderAdapter { - InputReaderAdapter { real_reader, size } - } -} - -impl Read for InputReaderAdapter { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.real_reader.read(buf) - } -} - -impl Seek for InputReaderAdapter { - fn seek(&mut self, pos: SeekFrom) -> Result { - self.real_reader.seek(pos) - } -} - -impl TryClone for InputReaderAdapter { - fn try_clone(&self) -> std::result::Result { - Err(ParquetError::NYI(String::from("TryClone for input reader"))) - } -} - -impl Length for InputReaderAdapter { - fn len(&self) -> u64 { - self.size - } -} diff --git a/delorean_parquet/src/metadata.rs b/delorean_parquet/src/metadata.rs index aa7d801c24..ae92b6764b 100644 --- a/delorean_parquet/src/metadata.rs +++ b/delorean_parquet/src/metadata.rs @@ -10,7 +10,7 @@ use delorean_table_schema::DataType; use crate::{ error::{Error, Result}, - InputReaderAdapter, + Length, TryClone, }; pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { @@ -36,23 +36,22 @@ pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataTy /// Print parquet metadata that can be read from `input`, with a total /// size of `input_size` byes -pub fn print_parquet_metadata(input: R, input_size: u64) -> Result<()> +pub fn print_parquet_metadata(input: R) -> Result<()> where - R: Read + Seek, + R: Read + Seek + TryClone + Length, { - let input_adapter = InputReaderAdapter::new(input, input_size); + let input_len = input.len(); - let reader = - SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError { - message: String::from("Creating parquet reader"), - source: e, - })?; + let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { + message: String::from("Creating parquet reader"), + source: e, + })?; let parquet_metadata = reader.metadata(); let file_metadata = parquet_metadata.file_metadata(); let num_columns = file_metadata.schema_descr().num_columns(); - println!("Parquet file size: {} bytes", input_size); + println!("Parquet file size: {} bytes", input_len); println!( "Parquet file Schema: {}", parquet_schema_as_string(file_metadata.schema()).trim_end() diff --git a/delorean_parquet/src/stats.rs b/delorean_parquet/src/stats.rs index bdc639ab94..ddf2350919 100644 --- a/delorean_parquet/src/stats.rs +++ b/delorean_parquet/src/stats.rs @@ -12,24 +12,21 @@ use delorean_table::stats::{ColumnStats, ColumnStatsBuilder}; use crate::{ error::{Error, Result}, metadata::data_type_from_parquet_type, - InputReaderAdapter, + Length, TryClone, }; /// Calculate storage statistics for a particular parquet file that can /// be read from `input`, with a total size of `input_size` byes /// /// Returns a Vec of ColumnStats, one for each column in the input -pub fn col_stats(input: R, input_size: u64) -> Result> +pub fn col_stats(input: R) -> Result> where - R: Read + Seek, + R: Read + Seek + TryClone + Length, { - let input_adapter = InputReaderAdapter::new(input, input_size); - - let reader = - SerializedFileReader::new(input_adapter).map_err(|e| Error::ParquetLibraryError { - message: String::from("Creating parquet reader"), - source: e, - })?; + let reader = SerializedFileReader::new(input).map_err(|e| Error::ParquetLibraryError { + message: String::from("Creating parquet reader"), + source: e, + })?; let mut stats_builders = BTreeMap::new(); diff --git a/src/commands/file_meta.rs b/src/commands/file_meta.rs index 06506a7803..031cc02050 100644 --- a/src/commands/file_meta.rs +++ b/src/commands/file_meta.rs @@ -40,8 +40,7 @@ pub fn dump_meta(input_filename: &str) -> Result<()> { Ok(()) } FileType::Parquet => { - let input_len = input_reader.len(); - print_parquet_metadata(input_reader, input_len) + print_parquet_metadata(input_reader) .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?; Ok(()) } diff --git a/src/commands/input.rs b/src/commands/input.rs index 88dcd15b9f..1dc6fc5611 100644 --- a/src/commands/input.rs +++ b/src/commands/input.rs @@ -6,6 +6,7 @@ use std::io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom}; use std::path::Path; use crate::commands::error::{Error, Result}; +use delorean_parquet::ParquetError; #[derive(Debug)] pub enum FileType { @@ -82,17 +83,21 @@ impl Seek for InputReader { } } -impl Read for InputReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { +impl delorean_parquet::Length for InputReader { + fn len(&self) -> u64 { match self { - InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), - InputReader::MemoryInputType(memory_input_reader) => { - memory_input_reader.cursor.read(buf) - } + InputReader::FileInputType(file_input_reader) => file_input_reader.file_size, + InputReader::MemoryInputType(memory_input_reader) => memory_input_reader.file_size, } } } +impl delorean_parquet::TryClone for InputReader { + fn try_clone(&self) -> std::result::Result { + Err(ParquetError::NYI(String::from("TryClone for input reader"))) + } +} + impl BufRead for InputReader { fn fill_buf(&mut self) -> io::Result<&[u8]> { match self { @@ -112,6 +117,17 @@ impl BufRead for InputReader { } } +impl Read for InputReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.read(buf) + } + } + } +} + impl InputReader { pub fn file_type(&self) -> &FileType { match self { diff --git a/src/commands/stats.rs b/src/commands/stats.rs index 6d51f7f122..1f9b8b7b44 100644 --- a/src/commands/stats.rs +++ b/src/commands/stats.rs @@ -30,7 +30,7 @@ pub fn stats(input_filename: &str) -> Result<()> { let input_len = input_reader.len(); ( input_len, - col_stats(input_reader, input_len) + col_stats(input_reader) .map_err(|e| Error::UnableDumpToParquetMetadata { source: e })?, ) } From 2c4a9dba53fa517476f61652aed4387201ee6702 Mon Sep 17 00:00:00 2001 From: alamb Date: Tue, 23 Jun 2020 17:21:20 -0400 Subject: [PATCH 3/3] fix: cleanup comment + code order --- delorean_parquet/src/lib.rs | 2 +- src/commands/input.rs | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/delorean_parquet/src/lib.rs b/delorean_parquet/src/lib.rs index 99632d3ecb..ed683b8fbd 100644 --- a/delorean_parquet/src/lib.rs +++ b/delorean_parquet/src/lib.rs @@ -2,7 +2,7 @@ #![deny(rust_2018_idioms)] #![warn(missing_debug_implementations, clippy::explicit_iter_loop)] -// Export the parts of +// Export the parts of the parquet crate that are needed to interact with code in this crate pub use parquet::{ errors::ParquetError, file::reader::{Length, TryClone}, diff --git a/src/commands/input.rs b/src/commands/input.rs index 1dc6fc5611..83be5f25ce 100644 --- a/src/commands/input.rs +++ b/src/commands/input.rs @@ -83,6 +83,17 @@ impl Seek for InputReader { } } +impl Read for InputReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), + InputReader::MemoryInputType(memory_input_reader) => { + memory_input_reader.cursor.read(buf) + } + } + } +} + impl delorean_parquet::Length for InputReader { fn len(&self) -> u64 { match self { @@ -117,17 +128,6 @@ impl BufRead for InputReader { } } -impl Read for InputReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - match self { - InputReader::FileInputType(file_input_reader) => file_input_reader.reader.read(buf), - InputReader::MemoryInputType(memory_input_reader) => { - memory_input_reader.cursor.read(buf) - } - } - } -} - impl InputReader { pub fn file_type(&self) -> &FileType { match self {