chore: Upgrade version of Arrow / DataFusion (3 of 3) + update code for new interfaces (#395)

pull/24376/head
Andrew Lamb 2020-11-02 11:20:44 -05:00 committed by GitHub
parent 8303dd197b
commit 9f36914351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 66 additions and 44 deletions

7
Cargo.lock generated
View File

@ -99,7 +99,7 @@ dependencies = [
[[package]]
name = "arrow"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a"
source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6"
dependencies = [
"chrono",
"csv",
@ -700,7 +700,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a"
source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6"
dependencies = [
"arrow 3.0.0-SNAPSHOT",
"async-trait",
@ -2130,9 +2130,10 @@ dependencies = [
[[package]]
name = "parquet"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=eea8d352bf3196a29c1ccf5e6c7342913916048a#eea8d352bf3196a29c1ccf5e6c7342913916048a"
source = "git+https://github.com/apache/arrow.git?rev=e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6"
dependencies = [
"arrow 3.0.0-SNAPSHOT",
"base64 0.11.0",
"brotli",
"byteorder",
"chrono",

View File

@ -3,7 +3,7 @@ use delorean_ingest::{ConversionSettings, LineProtocolConverter};
use delorean_line_parser::parse_lines;
use delorean_parquet::{
writer::{CompressionLevel, DeloreanParquetTableWriter},
ParquetError, TryClone,
TryClone,
};
use delorean_table::{DeloreanTableWriter, DeloreanTableWriterSource, Error as TableError};
use delorean_table_schema::Schema;
@ -32,7 +32,7 @@ impl Seek for IgnoringWriteStream {
}
impl TryClone for IgnoringWriteStream {
fn try_clone(&self) -> std::result::Result<Self, ParquetError> {
fn try_clone(&self) -> std::result::Result<Self, std::io::Error> {
Ok(IgnoringWriteStream {})
}
}

View File

@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for delorean, to
[dependencies]
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/eea8d352bf3196a29c1ccf5e6c7342913916048a
# The version can be found here: https://github.com/apache/arrow/commit/e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a" , features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a" }
arrow = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" , features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "eea8d352bf3196a29c1ccf5e6c7342913916048a", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
parquet = { git = "https://github.com/apache/arrow.git", rev = "e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

View File

@ -23,7 +23,7 @@ use delorean_tsm::{
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
io::{BufRead, Seek},
io::{Read, Seek},
};
use tracing::debug;
@ -680,7 +680,7 @@ impl TSMFileConverter {
mut block_readers: Vec<R>,
) -> Result<(), Error>
where
R: BufRead + Seek,
R: Read + Seek,
{
if index_readers.is_empty() {
return Err(Error::TSMProcessing {

View File

@ -10,7 +10,8 @@
// Export the parts of the parquet crate that are needed to interact with code in this crate
pub use delorean_arrow::parquet::{
errors::ParquetError,
file::reader::{Length, TryClone},
file::reader::{ChunkReader, Length},
file::writer::TryClone,
};
pub mod error;

View File

@ -6,9 +6,8 @@ use delorean_arrow::parquet::{
};
use delorean_table_schema::DataType;
use snafu::ResultExt;
use std::io::{Read, Seek};
use crate::{error::Result, Length, TryClone};
use crate::{error::Result, ChunkReader};
pub fn parquet_schema_as_string(parquet_schema: &schema::types::Type) -> String {
let mut parquet_schema_string = Vec::new();
@ -35,7 +34,7 @@ pub fn data_type_from_parquet_type(parquet_type: parquet::basic::Type) -> DataTy
/// size of `input_size` byes
pub fn print_parquet_metadata<R: 'static>(input: R) -> Result<()>
where
R: Read + Seek + TryClone + Length,
R: ChunkReader,
{
let input_len = input.len();

View File

@ -8,14 +8,10 @@ use delorean_table::{
Name,
};
use snafu::ResultExt;
use std::{
collections::BTreeMap,
convert::TryInto,
io::{Read, Seek},
};
use std::{collections::BTreeMap, convert::TryInto};
use tracing::debug;
use crate::{error::Result, metadata::data_type_from_parquet_type, Length, TryClone};
use crate::{error::Result, metadata::data_type_from_parquet_type, ChunkReader};
/// Calculate storage statistics for a particular parquet "file" that can
/// be read from `input`, with a total size of `input_size` byes
@ -24,7 +20,7 @@ use crate::{error::Result, metadata::data_type_from_parquet_type, Length, TryClo
/// columns across all column chunks.
pub fn file_stats<R: 'static>(input: R) -> Result<FileStats>
where
R: Read + Seek + TryClone + Length + Name,
R: ChunkReader + Name,
{
let mut file_stats_builder = FileStatsBuilder::new(&input.name(), input.len());
let reader = SerializedFileReader::new(input).context(crate::error::ParquetLibraryError {

View File

@ -5,11 +5,11 @@ use delorean_arrow::parquet::{
errors::ParquetError,
file::{
properties::{WriterProperties, WriterPropertiesBuilder},
reader::TryClone,
writer::{FileWriter, SerializedFileWriter},
writer::{FileWriter, SerializedFileWriter, TryClone},
},
schema::types::{ColumnPath, Type},
};
use parquet::file::writer::ParquetWriter;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
fmt,
@ -78,7 +78,7 @@ impl FromStr for CompressionLevel {
/// represented using the structures in `delorean_table` to parquet files.
pub struct DeloreanParquetTableWriter<W>
where
W: Write + Seek + TryClone,
W: ParquetWriter,
{
parquet_schema: Rc<parquet::schema::types::Type>,
file_writer: SerializedFileWriter<W>,

View File

@ -909,6 +909,7 @@ mod tests {
let has_header = false;
let delimiter = Some(b',');
let batch_size = 1000;
let bounds = None;
let projection = None;
let mut reader = csv::Reader::new(
data.as_bytes(),
@ -916,6 +917,7 @@ mod tests {
has_header,
delimiter,
batch_size,
bounds,
projection,
);

View File

@ -1,4 +1,5 @@
use delorean_parquet::ParquetError;
use delorean_arrow::parquet::file::serialized_reader::{FileSource, SliceableCursor};
use delorean_parquet::ChunkReader;
use delorean_table::Name;
/// Module to handle input files (and maybe urls?)
use libflate::gzip;
@ -9,7 +10,7 @@ use std::{
fs,
fs::File,
io,
io::{BufRead, BufReader, Cursor, Read, Seek, SeekFrom},
io::{BufReader, Read, Seek, SeekFrom},
path::{Path, PathBuf},
};
@ -91,7 +92,7 @@ pub struct MemoryInputReader {
file_type: FileType,
file_size: u64,
path: PathBuf,
cursor: Cursor<Vec<u8>>,
cursor: SliceableCursor,
}
impl FileInputReader {
@ -120,7 +121,7 @@ impl MemoryInputReader {
file_type,
file_size: len as u64,
path,
cursor: Cursor::new(buffer),
cursor: SliceableCursor::new(buffer),
}
}
}
@ -152,24 +153,32 @@ impl delorean_parquet::Length for InputReader {
}
}
impl delorean_parquet::TryClone for InputReader {
fn try_clone(&self) -> std::result::Result<Self, ParquetError> {
Err(ParquetError::NYI(String::from("TryClone for input reader")))
impl ChunkReader for InputReader {
type T = InputSlice;
fn get_read(
&self,
start: u64,
length: usize,
) -> delorean_arrow::parquet::errors::Result<Self::T> {
match self {
Self::FileInputType(file_input_reader) => Ok(InputSlice::FileSlice(FileSource::new(
file_input_reader.reader.get_ref(),
start,
length,
))),
Self::MemoryInputType(memory_input_reader) => Ok(InputSlice::Memory(
memory_input_reader.cursor.get_read(start, length)?,
)),
}
}
}
impl BufRead for InputReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.reader.fill_buf(),
Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.fill_buf(),
}
}
fn consume(&mut self, amt: usize) {
match self {
Self::FileInputType(file_input_reader) => file_input_reader.reader.consume(amt),
Self::MemoryInputType(memory_input_reader) => memory_input_reader.cursor.consume(amt),
}
impl delorean_parquet::TryClone for InputReader {
fn try_clone(&self) -> std::result::Result<Self, std::io::Error> {
Err(io::Error::new(
io::ErrorKind::Other,
"TryClone for input reader not supported",
))
}
}
@ -335,3 +344,17 @@ impl InputPath {
.map(|p| InputReader::new(&p.to_string_lossy()))
}
}
pub enum InputSlice {
FileSlice(FileSource<File>),
Memory(SliceableCursor),
}
impl Read for InputSlice {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::FileSlice(src) => src.read(buf),
Self::Memory(src) => src.read(buf),
}
}
}