chore: Update datafusion (again) (#4518)

* chore: Update datafusion (again)

* refactor: Update ExecutionPlan:execute to not be async
pull/24376/head
Andrew Lamb 2022-05-05 11:43:41 -04:00 committed by GitHub
parent bc5725b1fc
commit 37c7ce793c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 34 additions and 52 deletions

12
Cargo.lock generated
View File

@ -1195,7 +1195,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1227,7 +1227,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common" name = "datafusion-common"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"arrow", "arrow",
"ordered-float 3.0.0", "ordered-float 3.0.0",
@ -1238,7 +1238,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-data-access" name = "datafusion-data-access"
version = "1.0.0" version = "1.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
@ -1251,7 +1251,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr" name = "datafusion-expr"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1262,7 +1262,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr" name = "datafusion-physical-expr"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
@ -1285,7 +1285,7 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-proto" name = "datafusion-proto"
version = "7.0.0" version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6#b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" source = "git+https://github.com/apache/arrow-datafusion.git?rev=7304719bb4830c873af32f873ce22f205fef4c77#7304719bb4830c873af32f873ce22f205fef4c77"
dependencies = [ dependencies = [
"datafusion 7.0.0", "datafusion 7.0.0",
"prost", "prost",

View File

@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug # Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6", default-features = false, package = "datafusion" } upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="7304719bb4830c873af32f873ce22f205fef4c77", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="b7bb2cfba13cc04a08c2f687102dd14a8dedc7b6" } datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="7304719bb4830c873af32f873ce22f205fef4c77" }
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}

View File

@ -277,7 +277,7 @@ pub async fn test_execute_partition(
) -> SendableRecordBatchStream { ) -> SendableRecordBatchStream {
let session_ctx = SessionContext::new(); let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx)); let task_ctx = Arc::new(TaskContext::from(&session_ctx));
plan.execute(partition, task_ctx).await.unwrap() plan.execute(partition, task_ctx).unwrap()
} }
/// Execute the specified partition of the [ExecutionPlan] with a /// Execute the specified partition of the [ExecutionPlan] with a

View File

@ -198,7 +198,6 @@ impl<T> std::fmt::Debug for SystemTableExecutionPlan<T> {
} }
} }
#[async_trait]
impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T> { impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T> {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -226,7 +225,7 @@ impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T>
unimplemented!() unimplemented!()
} }
async fn execute( fn execute(
&self, &self,
_partition: usize, _partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,

View File

@ -133,7 +133,6 @@ impl<T> std::fmt::Debug for SystemTableExecutionPlan<T> {
} }
} }
#[async_trait]
impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T> { impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T> {
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
@ -161,7 +160,7 @@ impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T>
unimplemented!() unimplemented!()
} }
async fn execute( fn execute(
&self, &self,
_partition: usize, _partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,

View File

@ -358,7 +358,7 @@ impl IOxSessionContext {
let task_context = Arc::new(TaskContext::from(self.inner())); let task_context = Arc::new(TaskContext::from(self.inner()));
self.run(async move { self.run(async move {
let stream = physical_plan.execute(partition, task_context).await?; let stream = physical_plan.execute(partition, task_context)?;
let stream = TracedStream::new(stream, span, physical_plan); let stream = TracedStream::new(stream, span, physical_plan);
Ok(Box::pin(stream) as _) Ok(Box::pin(stream) as _)
}) })

View File

@ -42,8 +42,6 @@ use std::{
sync::Arc, sync::Arc,
}; };
use async_trait::async_trait;
use arrow::{ use arrow::{
array::{new_empty_array, StringArray}, array::{new_empty_array, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef}, datatypes::{DataType, Field, Schema, SchemaRef},
@ -195,7 +193,6 @@ impl Debug for NonNullCheckerExec {
} }
} }
#[async_trait]
impl ExecutionPlan for NonNullCheckerExec { impl ExecutionPlan for NonNullCheckerExec {
fn as_any(&self) -> &(dyn std::any::Any + 'static) { fn as_any(&self) -> &(dyn std::any::Any + 'static) {
self self
@ -246,7 +243,7 @@ impl ExecutionPlan for NonNullCheckerExec {
} }
/// Execute one partition and return an iterator over RecordBatch /// Execute one partition and return an iterator over RecordBatch
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,
@ -260,7 +257,7 @@ impl ExecutionPlan for NonNullCheckerExec {
} }
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let input_stream = self.input.execute(partition, context).await?; let input_stream = self.input.execute(partition, context)?;
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);

View File

@ -312,7 +312,6 @@ fn get_timestamps(metrics: &MetricsSet) -> (Option<DateTime<Utc>>, Option<DateTi
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use async_trait::async_trait;
use chrono::TimeZone; use chrono::TimeZone;
use datafusion::{ use datafusion::{
execution::context::TaskContext, execution::context::TaskContext,
@ -619,7 +618,6 @@ mod tests {
} }
} }
#[async_trait]
impl ExecutionPlan for TestExec { impl ExecutionPlan for TestExec {
fn as_any(&self) -> &dyn std::any::Any { fn as_any(&self) -> &dyn std::any::Any {
self self
@ -648,7 +646,7 @@ mod tests {
unimplemented!() unimplemented!()
} }
async fn execute( fn execute(
&self, &self,
_partition: usize, _partition: usize,
_context: Arc<TaskContext>, _context: Arc<TaskContext>,

View File

@ -25,8 +25,6 @@ use std::{
sync::Arc, sync::Arc,
}; };
use async_trait::async_trait;
use arrow::{ use arrow::{
array::StringArray, array::StringArray,
datatypes::{DataType, Field, Schema, SchemaRef}, datatypes::{DataType, Field, Schema, SchemaRef},
@ -166,7 +164,6 @@ impl Debug for SchemaPivotExec {
} }
} }
#[async_trait]
impl ExecutionPlan for SchemaPivotExec { impl ExecutionPlan for SchemaPivotExec {
fn as_any(&self) -> &(dyn std::any::Any + 'static) { fn as_any(&self) -> &(dyn std::any::Any + 'static) {
self self
@ -216,7 +213,7 @@ impl ExecutionPlan for SchemaPivotExec {
} }
/// Execute one partition and return an iterator over RecordBatch /// Execute one partition and return an iterator over RecordBatch
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,
@ -232,7 +229,7 @@ impl ExecutionPlan for SchemaPivotExec {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let input_schema = self.input.schema(); let input_schema = self.input.schema();
let input_stream = self.input.execute(partition, context).await?; let input_stream = self.input.execute(partition, context)?;
// the operation performed in a separate task which is // the operation performed in a separate task which is
// then sent via a channel to the output // then sent via a channel to the output

View File

@ -6,8 +6,6 @@ use std::{
sync::Arc, sync::Arc,
}; };
use async_trait::async_trait;
use arrow::{ use arrow::{
array::{Array, ArrayRef, BooleanArray}, array::{Array, ArrayRef, BooleanArray},
compute::{self, filter_record_batch}, compute::{self, filter_record_batch},
@ -32,7 +30,8 @@ use datafusion::{
use datafusion_util::AdapterStream; use datafusion_util::AdapterStream;
use futures::StreamExt; use futures::StreamExt;
use observability_deps::tracing::*; use observability_deps::tracing::*;
use tokio::sync::{mpsc::UnboundedSender, Mutex}; use parking_lot::Mutex;
use tokio::sync::mpsc::UnboundedSender;
/// Implements stream splitting described in `make_stream_split` /// Implements stream splitting described in `make_stream_split`
/// ///
@ -145,7 +144,6 @@ impl Debug for StreamSplitExec {
} }
} }
#[async_trait]
impl ExecutionPlan for StreamSplitExec { impl ExecutionPlan for StreamSplitExec {
fn as_any(&self) -> &(dyn std::any::Any + 'static) { fn as_any(&self) -> &(dyn std::any::Any + 'static) {
self self
@ -194,15 +192,15 @@ impl ExecutionPlan for StreamSplitExec {
/// ///
/// * partition 0 are the rows for which the split_expr evaluates to true /// * partition 0 are the rows for which the split_expr evaluates to true
/// * partition 1 are the rows for which the split_expr does not evaluate to true (e.g. Null or false) /// * partition 1 are the rows for which the split_expr does not evaluate to true (e.g. Null or false)
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> { ) -> Result<SendableRecordBatchStream> {
debug!(partition, "Start SplitExec::execute"); debug!(partition, "Start SplitExec::execute");
self.start_if_needed(context).await?; self.start_if_needed(context)?;
let mut state = self.state.lock().await; let mut state = self.state.lock();
match &mut (*state) { match &mut (*state) {
State::New => panic!("should have been initialized"), State::New => panic!("should have been initialized"),
State::Running { stream0, stream1 } => { State::Running { stream0, stream1 } => {
@ -241,8 +239,8 @@ impl ExecutionPlan for StreamSplitExec {
impl StreamSplitExec { impl StreamSplitExec {
/// if in State::New, sets up the output running and sets self.state --> `Running` /// if in State::New, sets up the output running and sets self.state --> `Running`
async fn start_if_needed(&self, context: Arc<TaskContext>) -> Result<()> { fn start_if_needed(&self, context: Arc<TaskContext>) -> Result<()> {
let mut state = self.state.lock().await; let mut state = self.state.lock();
if matches!(*state, State::Running { .. }) { if matches!(*state, State::Running { .. }) {
return Ok(()); return Ok(());
} }
@ -258,7 +256,7 @@ impl StreamSplitExec {
trace!("Setting up SplitStreamExec state"); trace!("Setting up SplitStreamExec state");
let input_stream = self.input.execute(0, context).await?; let input_stream = self.input.execute(0, context)?;
let (tx0, rx0) = tokio::sync::mpsc::unbounded_channel(); let (tx0, rx0) = tokio::sync::mpsc::unbounded_channel();
let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel(); let (tx1, rx1) = tokio::sync::mpsc::unbounded_channel();
let split_expr = Arc::clone(&self.split_expr); let split_expr = Arc::clone(&self.split_expr);

View File

@ -8,7 +8,6 @@ use arrow::{
error::{ArrowError, Result as ArrowResult}, error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use async_trait::async_trait;
use datafusion_util::{watch::watch_task, AdapterStream}; use datafusion_util::{watch::watch_task, AdapterStream};
pub use self::algo::RecordBatchDeduplicator; pub use self::algo::RecordBatchDeduplicator;
@ -140,7 +139,6 @@ impl DeduplicateMetrics {
} }
} }
#[async_trait]
impl ExecutionPlan for DeduplicateExec { impl ExecutionPlan for DeduplicateExec {
fn as_any(&self) -> &dyn std::any::Any { fn as_any(&self) -> &dyn std::any::Any {
self self
@ -183,7 +181,7 @@ impl ExecutionPlan for DeduplicateExec {
Ok(Arc::new(Self::new(input, self.sort_keys.clone()))) Ok(Arc::new(Self::new(input, self.sort_keys.clone())))
} }
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
context: Arc<TaskContext>, context: Arc<TaskContext>,
@ -197,7 +195,7 @@ impl ExecutionPlan for DeduplicateExec {
} }
let deduplicate_metrics = DeduplicateMetrics::new(&self.metrics, partition); let deduplicate_metrics = DeduplicateMetrics::new(&self.metrics, partition);
let input_stream = self.input.execute(0, context).await?; let input_stream = self.input.execute(0, context)?;
// the deduplication is performed in a separate task which is // the deduplication is performed in a separate task which is
// then sent via a channel to the output // then sent via a channel to the output
@ -1119,7 +1117,6 @@ mod test {
batches: Vec<ArrowResult<RecordBatch>>, batches: Vec<ArrowResult<RecordBatch>>,
} }
#[async_trait]
impl ExecutionPlan for DummyExec { impl ExecutionPlan for DummyExec {
fn as_any(&self) -> &dyn std::any::Any { fn as_any(&self) -> &dyn std::any::Any {
self self
@ -1148,7 +1145,7 @@ mod test {
unimplemented!() unimplemented!()
} }
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
_context: Arc<TaskContext>, _context: Arc<TaskContext>,
@ -1157,19 +1154,19 @@ mod test {
debug!(partition, "Start DummyExec::execute"); debug!(partition, "Start DummyExec::execute");
// ensure there is space to queue up the channel // queue them all up
let (tx, rx) = mpsc::channel(self.batches.len()); let (tx, rx) = mpsc::unbounded_channel();
// queue up all the results // queue up all the results
for r in &self.batches { for r in &self.batches {
match r { match r {
Ok(batch) => tx.send(Ok(batch.clone())).await.unwrap(), Ok(batch) => tx.send(Ok(batch.clone())).unwrap(),
Err(e) => tx.send(Err(clone_error(e))).await.unwrap(), Err(e) => tx.send(Err(clone_error(e))).unwrap(),
} }
} }
debug!(partition, "End DummyExec::execute"); debug!(partition, "End DummyExec::execute");
Ok(AdapterStream::adapt(self.schema(), rx)) Ok(AdapterStream::adapt_unbounded(self.schema(), rx))
} }
fn statistics(&self) -> Statistics { fn statistics(&self) -> Statistics {

View File

@ -20,8 +20,6 @@ use schema::Schema;
use crate::{exec::IOxSessionContext, QueryChunk}; use crate::{exec::IOxSessionContext, QueryChunk};
use predicate::Predicate; use predicate::Predicate;
use async_trait::async_trait;
use super::adapter::SchemaAdapterStream; use super::adapter::SchemaAdapterStream;
/// Implements the DataFusion physical plan interface /// Implements the DataFusion physical plan interface
@ -62,7 +60,6 @@ impl IOxReadFilterNode {
} }
} }
#[async_trait]
impl ExecutionPlan for IOxReadFilterNode { impl ExecutionPlan for IOxReadFilterNode {
fn as_any(&self) -> &dyn std::any::Any { fn as_any(&self) -> &dyn std::any::Any {
self self
@ -108,7 +105,7 @@ impl ExecutionPlan for IOxReadFilterNode {
Ok(Arc::new(new_self)) Ok(Arc::new(new_self))
} }
async fn execute( fn execute(
&self, &self,
partition: usize, partition: usize,
_context: Arc<TaskContext>, _context: Arc<TaskContext>,