chore: Update arrow dependencies, remove custom min/max implementation (#585)

* chore: Update arrow dependency

* fix: Update code for changes in datafusion

* fix: use arrow version of min_boolean
pull/24376/head
Andrew Lamb 2020-12-21 12:31:39 -05:00 committed by GitHub
parent e96d309a5e
commit bb96142564
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 58 deletions

15
Cargo.lock generated
View File

@ -86,8 +86,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "arrow"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544"
source = "git+https://github.com/apache/arrow.git?rev=9724afd732481115c675fe68973d741c5530d23d#9724afd732481115c675fe68973d741c5530d23d"
dependencies = [
"cfg_aliases",
"chrono",
"csv",
"flatbuffers",
@ -363,6 +364,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cfg_aliases"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
[[package]]
name = "chrono"
version = "0.4.19"
@ -729,7 +736,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544"
source = "git+https://github.com/apache/arrow.git?rev=9724afd732481115c675fe68973d741c5530d23d#9724afd732481115c675fe68973d741c5530d23d"
dependencies = [
"ahash 0.6.2",
"arrow",
@ -2075,10 +2082,10 @@ dependencies = [
[[package]]
name = "parquet"
version = "3.0.0-SNAPSHOT"
source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544"
source = "git+https://github.com/apache/arrow.git?rev=9724afd732481115c675fe68973d741c5530d23d#9724afd732481115c675fe68973d741c5530d23d"
dependencies = [
"arrow",
"base64 0.13.0",
"base64 0.12.3",
"brotli",
"byteorder",
"chrono",

View File

@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx
[dependencies]
# We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev
# The version can be found here: https://github.com/apache/arrow/commit/edff65dfe73d3380a211b678f3760a60a66e5544
# The version can be found here: https://github.com/apache/arrow/commit/9724afd732481115c675fe68973d741c5530d23d
#
arrow = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544" , features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544" }
arrow = { git = "https://github.com/apache/arrow.git", rev = "9724afd732481115c675fe68973d741c5530d23d" , features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow.git", rev = "9724afd732481115c675fe68973d741c5530d23d" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }
parquet = { git = "https://github.com/apache/arrow.git", rev = "9724afd732481115c675fe68973d741c5530d23d", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] }

View File

@ -13,6 +13,7 @@ use arrow_deps::{
execution::context::{ExecutionContextState, QueryPlanner},
logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
collect,
functions::ScalarFunctionImplementation,
merge::MergeExec,
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
@ -126,7 +127,7 @@ impl IOxExecutionContext {
debug!("Running plan, physical:\n{:?}", physical_plan);
self.inner.collect(physical_plan).await
collect(physical_plan).await
}
/// Executes the physical plan and produces a RecordBatchStream to stream

View File

@ -10,8 +10,8 @@ use std::fmt::Debug;
use arrow_deps::{
arrow::compute::kernels::aggregate::{
max as array_max, max_string as array_max_string, min as array_min,
min_string as array_min_string,
max as array_max, max_boolean as array_max_boolean, max_string as array_max_string,
min as array_min, min_boolean as array_min_boolean, min_string as array_min_string,
},
arrow::{
array::{Array, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray},
@ -22,48 +22,6 @@ use arrow_deps::{
use super::{Selector, SelectorOutput};
/// Arrow aggregate kernels don't include a min or max for
/// bools... Which is a silly operation anyways, when you think about
/// it. However, we include it here for completeness.
///
/// This is some version of `min_max_helper` from
/// aggregate.rs in arrow
///
/// This code should be contribited upstream and then removed from
/// here when arrow gets this feature:
/// https://issues.apache.org/jira/browse/ARROW-10944
fn min_max_helper<F>(array: &BooleanArray, cmp: F) -> Option<bool>
where
F: Fn(bool, bool) -> bool,
{
let null_count = array.null_count();
// Includes case array.len() == 0
if null_count == array.len() {
return None;
}
// optimized path for arrays without null values
let m0: Option<bool> = array.iter().next().unwrap();
array.iter().fold(m0, |max, item| match (max, item) {
(Some(max), Some(item)) => Some(if cmp(max, item) { item } else { max }),
(Some(max), None) => Some(max),
(None, Some(item)) => Some(item),
(None, None) => None,
})
}
fn array_min_bool(array: &BooleanArray) -> Option<bool> {
// a > b == a & !b
min_max_helper(array, |a, b| a & !b)
}
fn array_max_bool(array: &BooleanArray) -> Option<bool> {
// a < b == !a & b
min_max_helper(array, |a, b| !a & b)
}
/// Trait for comparing values in arrays with their native
/// representation. This so the same comparison expression can be used
/// in the macro definitions.
@ -583,7 +541,7 @@ make_min_selector!(
bool,
DataType::Boolean,
BooleanArray,
array_min_bool,
array_min_boolean,
ScalarValue::Boolean
);
@ -618,6 +576,6 @@ make_max_selector!(
bool,
DataType::Boolean,
BooleanArray,
array_max_bool,
array_max_boolean,
ScalarValue::Boolean
);

View File

@ -28,10 +28,9 @@ use std::{
use arrow_deps::{
arrow,
arrow::{datatypes::Schema as ArrowSchema, record_batch::RecordBatch},
datafusion::logical_plan::LogicalPlan,
datafusion::prelude::ExecutionConfig,
datafusion::{
datasource::MemTable, error::DataFusionError, execution::context::ExecutionContext,
logical_plan::LogicalPlan, physical_plan::collect, prelude::ExecutionConfig,
},
};
use data_types::data::{split_lines_into_write_entry_partitions, ReplicatedWrite};
@ -596,7 +595,7 @@ impl SQLDatabase for Db {
.create_physical_plan(&plan)
.context(QueryError { query })?;
ctx.collect(plan).await.context(QueryError { query })
collect(plan).await.context(QueryError { query })
}
/// Fetch the specified table names and columns as Arrow