chore: Update arrow (again) to pick up latest changes to datafusion (#345)
parent
aaeb0d4c84
commit
2b8c04f2b4
|
@ -78,7 +78,7 @@ checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
|
|||
[[package]]
|
||||
name = "arrow"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673#238a9497269f39ab4d5bf20c28c2431a1b4e6673"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439#599b458c68dfcba38fe5448913d4bb69723e1439"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"csv",
|
||||
|
@ -647,9 +647,9 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673#238a9497269f39ab4d5bf20c28c2431a1b4e6673"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439#599b458c68dfcba38fe5448913d4bb69723e1439"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673)",
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439)",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"clap",
|
||||
|
@ -719,7 +719,7 @@ dependencies = [
|
|||
name = "delorean_arrow"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673)",
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439)",
|
||||
"datafusion",
|
||||
"parquet",
|
||||
]
|
||||
|
@ -2118,9 +2118,9 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "parquet"
|
||||
version = "2.0.0-SNAPSHOT"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673#238a9497269f39ab4d5bf20c28c2431a1b4e6673"
|
||||
source = "git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439#599b458c68dfcba38fe5448913d4bb69723e1439"
|
||||
dependencies = [
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=238a9497269f39ab4d5bf20c28c2431a1b4e6673)",
|
||||
"arrow 2.0.0-SNAPSHOT (git+https://github.com/apache/arrow.git?rev=599b458c68dfcba38fe5448913d4bb69723e1439)",
|
||||
"brotli",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
|
|
|
@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for delorean, to
|
|||
[dependencies]
|
||||
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
|
||||
|
||||
# The version can be found here: https://github.com/apache/arrow/commit/238a9497269f39ab4d5bf20c28c2431a1b4e6673
|
||||
# The version can be found here: https://github.com/apache/arrow/commit/599b458c68dfcba38fe5448913d4bb69723e1439
|
||||
#
|
||||
arrow = { git = "https://github.com/apache/arrow.git", rev = "238a9497269f39ab4d5bf20c28c2431a1b4e6673" , features = ["simd"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow.git", rev = "238a9497269f39ab4d5bf20c28c2431a1b4e6673" }
|
||||
arrow = { git = "https://github.com/apache/arrow.git", rev = "599b458c68dfcba38fe5448913d4bb69723e1439" , features = ["simd"] }
|
||||
datafusion = { git = "https://github.com/apache/arrow.git", rev = "599b458c68dfcba38fe5448913d4bb69723e1439" }
|
||||
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
|
||||
# and we're not currently using it anyway
|
||||
parquet = { git = "https://github.com/apache/arrow.git", rev = "238a9497269f39ab4d5bf20c28c2431a1b4e6673", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
|
||||
parquet = { git = "https://github.com/apache/arrow.git", rev = "599b458c68dfcba38fe5448913d4bb69723e1439", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
|
||||
|
|
|
@ -23,7 +23,6 @@ use std::{
|
|||
any::Any,
|
||||
fmt::{self, Debug},
|
||||
sync::Arc,
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
@ -188,10 +187,7 @@ impl ExecutionPlan for SchemaPivotExec {
|
|||
}
|
||||
|
||||
/// Execute one partition and return an iterator over RecordBatch
|
||||
async fn execute(
|
||||
&self,
|
||||
partition: usize,
|
||||
) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
|
||||
async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader + Send>> {
|
||||
if 0 != partition {
|
||||
return Err(ExecutionError::General(format!(
|
||||
"SchemaPivotExec invalid partition {}",
|
||||
|
@ -199,7 +195,7 @@ impl ExecutionPlan for SchemaPivotExec {
|
|||
)));
|
||||
}
|
||||
|
||||
let input_reader = self.input.execute(partition).await?;
|
||||
let mut input_reader = self.input.execute(partition).await?;
|
||||
|
||||
// Algorithm: for each column we haven't seen a value for yet,
|
||||
// check each input row;
|
||||
|
@ -216,11 +212,7 @@ impl ExecutionPlan for SchemaPivotExec {
|
|||
// use a loop so that we release the mutex once we have read each input_batch
|
||||
let mut keep_searching = true;
|
||||
while keep_searching {
|
||||
let input_batch = input_reader
|
||||
.lock()
|
||||
.expect("locked input mutex")
|
||||
.next()
|
||||
.transpose()?;
|
||||
let input_batch = input_reader.next().transpose()?;
|
||||
|
||||
keep_searching = match input_batch {
|
||||
Some(input_batch) => {
|
||||
|
@ -273,10 +265,7 @@ impl ExecutionPlan for SchemaPivotExec {
|
|||
RecordBatch::try_new(self.schema(), vec![Arc::new(column_name_builder.finish())])?;
|
||||
|
||||
let batches = vec![Arc::new(batch)];
|
||||
Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
|
||||
self.schema(),
|
||||
batches,
|
||||
))))
|
||||
Ok(Box::new(RecordBatchIterator::new(self.schema(), batches)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -426,9 +415,8 @@ mod tests {
|
|||
}
|
||||
|
||||
/// Return a StringSet extracted from the record batch
|
||||
fn reader_to_stringset(reader: Arc<Mutex<dyn RecordBatchReader>>) -> StringSetRef {
|
||||
fn reader_to_stringset(mut reader: Box<dyn RecordBatchReader>) -> StringSetRef {
|
||||
let mut batches = Vec::new();
|
||||
let mut reader = reader.lock().expect("locking record batch reader");
|
||||
// process the record batches one by one
|
||||
while let Some(record_batch) = reader.next().transpose().expect("reading next batch") {
|
||||
batches.push(record_batch)
|
||||
|
|
|
@ -1027,13 +1027,13 @@ mod tests {
|
|||
async fn write_data_and_recover() -> Result {
|
||||
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
|
||||
|
||||
let expected_cpu_table = r#"+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
| region | host | user | other | str | b | time | new_tag | new_field |
|
||||
+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
| west | A | 23.2 | 1 | some string | true | 10 | | 0 |
|
||||
| west | B | 23.1 | 0 | | false | 15 | | 0 |
|
||||
| | A | 0 | 0 | | false | 20 | foo | 15.1 |
|
||||
+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
| region | host | user | other | str | b | time | new_tag | new_field |
|
||||
+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
| west | A | 23.2 | 1 | some string | true | 10 | | |
|
||||
| west | B | 23.1 | | | | 15 | | |
|
||||
| | A | | | | | 20 | foo | 15.1 |
|
||||
+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
"#;
|
||||
let expected_mem_table = r#"+--------+------+-------+------+
|
||||
| region | host | val | time |
|
||||
|
@ -1110,14 +1110,15 @@ mod tests {
|
|||
async fn recover_partial_entries() -> Result {
|
||||
let mut dir = delorean_test_helpers::tmp_dir()?.into_path();
|
||||
|
||||
let expected_cpu_table = r#"+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
| region | host | user | other | str | b | time | new_tag | new_field |
|
||||
+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
| west | A | 23.2 | 1 | some string | true | 10 | | 0 |
|
||||
| west | B | 23.1 | 0 | | false | 15 | | 0 |
|
||||
| | A | 0 | 0 | | false | 20 | foo | 15.1 |
|
||||
+--------+------+------+-------+-------------+-------+------+---------+-----------+
|
||||
let expected_cpu_table = r#"+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
| region | host | user | other | str | b | time | new_tag | new_field |
|
||||
+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
| west | A | 23.2 | 1 | some string | true | 10 | | |
|
||||
| west | B | 23.1 | | | | 15 | | |
|
||||
| | A | | | | | 20 | foo | 15.1 |
|
||||
+--------+------+------+-------+-------------+------+------+---------+-----------+
|
||||
"#;
|
||||
|
||||
let expected_mem_table = r#"+--------+------+-------+------+
|
||||
| region | host | val | time |
|
||||
+--------+------+-------+------+
|
||||
|
|
Loading…
Reference in New Issue