diff --git a/Cargo.lock b/Cargo.lock index 4e1cefaafa..22fdebb7a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,7 +99,7 @@ dependencies = [ [[package]] name = "arrow" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a" +source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" dependencies = [ "chrono", "csv", @@ -700,7 +700,7 @@ dependencies = [ [[package]] name = "datafusion" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a" +source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" dependencies = [ "arrow 3.0.0-SNAPSHOT", "async-trait", @@ -2130,9 +2130,10 @@ dependencies = [ [[package]] name = "parquet" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a" +source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" dependencies = [ "arrow 3.0.0-SNAPSHOT", + "base64 0.11.0", "brotli", "byteorder", "chrono", diff --git a/benches/line_protocol_to_parquet.rs b/benches/line_protocol_to_parquet.rs index e72e1ee0f0..5767137981 100644 --- a/benches/line_protocol_to_parquet.rs +++ b/benches/line_protocol_to_parquet.rs @@ -3,7 +3,7 @@ use delorean_ingest::{ConversionSettings, LineProtocolConverter}; use delorean_line_parser::parse_lines; use delorean_parquet::{ writer::{CompressionLevel, DeloreanParquetTableWriter}, - ParquetError, TryClone, + TryClone, }; use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError}; use delorean_table_schema::Schema; @@ -32,7 +32,7 @@ impl Seek for IgnoringWriteStream { } impl TryClone for IgnoringWriteStream { - fn try_clone(&self) -> std::result::Result { + fn try_clone(&self) -> std::result::Result { Ok(IgnoringWriteStream {}) } } diff --git a/delorean_arrow/Cargo.toml b/delorean_arrow/Cargo.toml index 30dc09c991..53f7f3645b 100644 --- a/delorean_arrow/Cargo.toml +++ b/delorean_arrow/Cargo.toml @@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for delorean, to [dependencies] # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev -# The version can be found here: https://github.com/apache/arrow/commit/eea8d352bf3196a29c1ccf5e6c7342913916048a +# The version can be found here: https://github.com/apache/arrow/commit/e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6 # -arrow = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a" , features = ["simd"] } -datafusion = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a" } +arrow = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" , features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway -parquet = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } +parquet = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index 1d20476350..e0cc30a4a7 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -23,7 +23,7 @@ use delorean_tsm::{ use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, - io::{BufRead, Seek}, + io::{Read, Seek}, }; use tracing::debug; @@ -680,7 +680,7 @@ impl TSMFileConverter { mut block_readers: Vec, ) -> Result<(), Error> where - R: BufRead + Seek, + R: Read + Seek, { if index_readers.is_empty() { return Err(Error::TSMProcessing { diff --git a/delorean_parquet/src/lib.rs b/delorean_parquet/src/lib.rs index 7d64302418..f1bdcef432 100644 --- a/delorean_parquet/src/lib.rs +++ b/delorean_parquet/src/lib.rs @@ -10,7 +10,8 @@ // Export the parts of the parquet crate that are needed to interact with code in this crate pub use delorean_arrow::parquet::{ errors::ParquetError, - file::reader::{Length, TryClone}, + file::reader::{ChunkReader, Length}, + file::writer::TryClone, }; pub mod error; diff --git a/delorean_parquet/src/metadata.rs b/delorean_parquet/src/metadata.rs index bf72717480..37bd7f4851 100644 --- a/delorean_parquet/src/metadata.rs +++ b/delorean_parquet/src/metadata.rs @@ -6,9 +6,8 @@ use delorean_arrow::parquet::{ }; use delorean_table_schema::DataType; use snafu::ResultExt; -use std::io::{Read, Seek}; -use crate::{error::Result, Length, TryClone}; +use crate::{error::Result, ChunkReader}; pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String { let mut parquet_schema_string = Vec::new(); @@ -35,7 +34,7 @@ pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataTy /// size of `input_size` byes pub fn print_parquet_metadata(input: R) -> Result<()> where - R: Read + Seek + TryClone + Length, + R: ChunkReader, { let input_len = input.len(); diff --git a/delorean_parquet/src/stats.rs b/delorean_parquet/src/stats.rs index 8bdbf2ec11..3a43395a42 100644 --- a/delorean_parquet/src/stats.rs +++ b/delorean_parquet/src/stats.rs @@ -8,14 +8,10 @@ use delorean_table::{ Name, }; use snafu::ResultExt; -use std::{ - collections::BTreeMap, - convert::TryInto, - io::{Read, Seek}, -}; +use std::{collections::BTreeMap, convert::TryInto}; use tracing::debug; -use crate::{error::Result, metadata::data_type_from_parquet_type, Length, TryClone}; +use crate::{error::Result, metadata::data_type_from_parquet_type, ChunkReader}; /// Calculate storage statistics for a particular parquet "file" that can /// be read from `input`, with a total size of `input_size` byes @@ -24,7 +20,7 @@ use crate::{error::Result, metadata::data_type_from_parquet_type, Length, TryClo /// columns across all column chunks. pub fn file_stats(input: R) -> Result where - R: Read + Seek + TryClone + Length + Name, + R: ChunkReader + Name, { let mut file_stats_builder = FileStatsBuilder::new(&input.name(), input.len()); let reader = SerializedFileReader::new(input).context(crate::error::ParquetLibraryError { diff --git a/delorean_parquet/src/writer.rs b/delorean_parquet/src/writer.rs index ac1f4e428d..82bc3bfc19 100644 --- a/delorean_parquet/src/writer.rs +++ b/delorean_parquet/src/writer.rs @@ -5,11 +5,11 @@ use delorean_arrow::parquet::{ errors::ParquetError, file::{ properties::{WriterProperties, WriterPropertiesBuilder}, - reader::TryClone, - writer::{FileWriter, SerializedFileWriter}, + writer::{FileWriter, SerializedFileWriter, TryClone}, }, schema::types::{ColumnPath, Type}, }; +use parquet::file::writer::ParquetWriter; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ fmt, @@ -78,7 +78,7 @@ impl FromStr for CompressionLevel { /// represented using the structures in `delorean_table` to parquet files. pub struct DeloreanParquetTableWriter where - W: Write + Seek + TryClone, + W: ParquetWriter, { parquet_schema: Rc, file_writer: SerializedFileWriter, diff --git a/delorean_storage/src/exec/seriesset.rs b/delorean_storage/src/exec/seriesset.rs index bd73a71a10..71b2cc742f 100644 --- a/delorean_storage/src/exec/seriesset.rs +++ b/delorean_storage/src/exec/seriesset.rs @@ -909,6 +909,7 @@ mod tests { let has_header = false; let delimiter = Some(b','); let batch_size = 1000; + let bounds = None; let projection = None; let mut reader = csv::Reader::new( data.as_bytes(), @@ -916,6 +917,7 @@ mod tests { has_header, delimiter, batch_size, + bounds, projection, ); diff --git a/src/commands/input.rs b/src/commands/input.rs index 199242ab85..47d2a7222c 100644 --- a/src/commands/input.rs +++ b/src/commands/input.rs @@ -1,4 +1,5 @@ -use delorean_parquet::ParquetError; +use delorean_arrow::parquet::file::serialized_reader::{FileSource, SliceableCursor}; +use delorean_parquet::ChunkReader; use delorean_table::Name; /// Module to handle input files (and maybe urls?) use libflate::gzip; @@ -9,7 +10,7 @@ use std::{ fs, fs::File, io, - io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom}, + io::{BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, }; @@ -91,7 +92,7 @@ pub struct MemoryInputReader { file_type: FileType, file_size: u64, path: PathBuf, - cursor: Cursor>, + cursor: SliceableCursor, } impl FileInputReader { @@ -120,7 +121,7 @@ impl MemoryInputReader { file_type, file_size: len as u64, path, - cursor: Cursor::new(buffer), + cursor: SliceableCursor::new(buffer), } } } @@ -152,24 +153,32 @@ impl delorean_parquet::Length for InputReader { } } -impl delorean_parquet::TryClone for InputReader { - fn try_clone(&self) -> std::result::Result { - Err(ParquetError::NYI(String::from("TryClone for input reader"))) +impl ChunkReader for InputReader { + type T = InputSlice; + fn get_read( + &self, + start: u64, + length: usize, + ) -> delorean_arrow::parquet::errors::Result { + match self { + Self::FileInputType(file_input_reader) => Ok(InputSlice::FileSlice(FileSource::new( + file_input_reader.reader.get_ref(), + start, + length, + ))), + Self::MemoryInputType(memory_input_reader) => Ok(InputSlice::Memory( + memory_input_reader.cursor.get_read(start, length)?, + )), + } } } -impl BufRead for InputReader { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.reader.fill_buf(), - Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.fill_buf(), - } - } - fn consume(&mut self, amt: usize) { - match self { - Self::FileInputType(file_input_reader) => file_input_reader.reader.consume(amt), - Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.consume(amt), - } +impl delorean_parquet::TryClone for InputReader { + fn try_clone(&self) -> std::result::Result { + Err(io::Error::new( + io::ErrorKind::Other, + "TryClone for input reader not supported", + )) } } @@ -335,3 +344,17 @@ impl InputPath { .map(|p| InputReader::new(&p.to_string_lossy())) } } + +pub enum InputSlice { + FileSlice(FileSource), + Memory(SliceableCursor), +} + +impl Read for InputSlice { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match self { + Self::FileSlice(src) => src.read(buf), + Self::Memory(src) => src.read(buf), + } + } +}