diff --git a/Cargo.lock b/Cargo.lock index eb03efb30c..5d629855fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1191,7 +1191,7 @@ dependencies = [ [[package]] name = "datafusion" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "ahash 0.8.0", "arrow", @@ -1235,7 +1235,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "arrow", "object_store", @@ -1247,18 +1247,19 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "ahash 0.8.0", "arrow", "datafusion-common", + "log", "sqlparser", ] [[package]] name = "datafusion-optimizer" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "arrow", "async-trait", @@ -1273,7 +1274,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "ahash 0.8.0", "arrow", @@ -1297,7 +1298,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "arrow", "datafusion 13.0.0", @@ -1310,7 +1311,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "arrow", "datafusion-common", @@ -1321,7 +1322,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=61c38b7114e802f9f289bf5364a031395f5799a6#61c38b7114e802f9f289bf5364a031395f5799a6" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=0aa050a1a02438c67bb85f7e95fde521e80c640b#0aa050a1a02438c67bb85f7e95fde521e80c640b" dependencies = [ "arrow", "datafusion-common", diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 44d5d1e1a0..34e0dad011 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -6,7 +6,7 @@ use data_types::{ CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId, SequenceNumber, TableSchema, TimestampMinMax, }; -use datafusion::{error::DataFusionError, logical_plan::LogicalPlan}; +use datafusion::{error::DataFusionError, logical_expr::LogicalPlan}; use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt}; use iox_catalog::interface::Catalog; use iox_query::{ diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 58c154ce11..ec079b6cfd 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypto functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="61c38b7114e802f9f289bf5364a031395f5799a6", default-features = false, package = "datafusion" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="61c38b7114e802f9f289bf5364a031395f5799a6" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0aa050a1a02438c67bb85f7e95fde521e80c640b", default-features = false, package = "datafusion" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="0aa050a1a02438c67bb85f7e95fde521e80c640b" } workspace-hack = { path = "../workspace-hack"} diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 0868183255..b9302303c0 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -22,18 +22,18 @@ use datafusion::arrow::datatypes::DataType; use datafusion::common::DataFusionError; use datafusion::datasource::MemTable; use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::Operator; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; use datafusion::physical_plan::{collect, EmptyRecordBatchStream, ExecutionPlan}; -use datafusion::prelude::SessionContext; +use datafusion::prelude::{col, lit, Expr, SessionContext}; use datafusion::{ arrow::{ datatypes::{Schema, SchemaRef}, error::Result as ArrowResult, record_batch::RecordBatch, }, - logical_plan::{col, lit, Expr, Operator}, physical_plan::{RecordBatchStream, SendableRecordBatchStream}, scalar::ScalarValue, }; diff --git a/generated_types/src/ingester.rs b/generated_types/src/ingester.rs index d3c3da0ffb..109417b27a 100644 --- a/generated_types/src/ingester.rs +++ b/generated_types/src/ingester.rs @@ -1,8 +1,6 @@ use crate::{google::FieldViolation, influxdata::iox::ingester::v1 as proto}; use data_types::TimestampRange; -use datafusion::{ - common::DataFusionError, datafusion_proto::bytes::Serializeable, logical_plan::Expr, -}; +use datafusion::{common::DataFusionError, datafusion_proto::bytes::Serializeable, prelude::Expr}; use predicate::{Predicate, ValueExpr}; use prost::Message; use snafu::{ResultExt, Snafu}; diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 59996f94cf..b1f2338a1c 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -351,8 +351,8 @@ mod tests { use arrow_util::assert_batches_sorted_eq; use assert_matches::assert_matches; use datafusion::{ - logical_plan::{col, lit}, physical_plan::RecordBatchStream, + prelude::{col, lit}, }; use futures::TryStreamExt; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index 53d76827aa..42ed5012d2 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -22,8 +22,8 @@ use datafusion::{ context::SessionState, runtime_env::{RuntimeConfig, RuntimeEnv}, }, - logical_expr::Extension, - logical_plan::{normalize_col, Expr, LogicalPlan}, + logical_expr::{expr_rewriter::normalize_col, Extension}, + logical_expr::{Expr, LogicalPlan}, prelude::SessionContext, }; @@ -270,8 +270,8 @@ mod tests { datatypes::{DataType, Field, Schema, SchemaRef}, }; use datafusion::{ - datasource::MemTable, - logical_plan::{provider_as_source, LogicalPlanBuilder}, + datasource::{provider_as_source, MemTable}, + logical_expr::LogicalPlanBuilder, }; use stringset::StringSet; diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 26fca44620..886eb7c3e4 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -15,7 +15,7 @@ use datafusion::{ context::{QueryPlanner, SessionState, TaskContext}, runtime_env::RuntimeEnv, }, - logical_plan::{LogicalPlan, UserDefinedLogicalNode}, + logical_expr::{LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ coalesce_partitions::CoalescePartitionsExec, displayable, diff --git a/iox_query/src/exec/non_null_checker.rs b/iox_query/src/exec/non_null_checker.rs index 9f1384e05f..ce6ed0a0f4 100644 --- a/iox_query/src/exec/non_null_checker.rs +++ b/iox_query/src/exec/non_null_checker.rs @@ -49,9 +49,10 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion::{ + common::{DFSchemaRef, ToDFSchema}, error::{DataFusionError as Error, Result}, execution::context::TaskContext, - logical_plan::{DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode}, + logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ expressions::PhysicalSortExpr, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, diff --git a/iox_query/src/exec/schema_pivot.rs b/iox_query/src/exec/schema_pivot.rs index 1f51036f05..5d2a2696ae 100644 --- a/iox_query/src/exec/schema_pivot.rs +++ b/iox_query/src/exec/schema_pivot.rs @@ -32,9 +32,10 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion::{ + common::{DFSchemaRef, ToDFSchema}, error::{DataFusionError as Error, Result}, execution::context::TaskContext, - logical_plan::{DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode}, + logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ expressions::PhysicalSortExpr, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput}, diff --git a/iox_query/src/exec/split.rs b/iox_query/src/exec/split.rs index f13e867dc9..efe0212fa7 100644 --- a/iox_query/src/exec/split.rs +++ b/iox_query/src/exec/split.rs @@ -61,9 +61,10 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion::{ + common::DFSchemaRef, error::{DataFusionError, Result}, execution::context::TaskContext, - logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}, + logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}, physical_plan::{ expressions::PhysicalSortExpr, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput}, @@ -519,8 +520,8 @@ mod tests { use arrow::array::{Int64Array, StringArray}; use arrow_util::assert_batches_sorted_eq; use datafusion::{ - logical_plan::{col, lit}, physical_plan::memory::MemoryExec, + prelude::{col, lit}, }; use datafusion_util::test_collect_partition; diff --git a/iox_query/src/frontend/common.rs b/iox_query/src/frontend/common.rs index 11f0dd2166..18842cfb57 100644 --- a/iox_query/src/frontend/common.rs +++ b/iox_query/src/frontend/common.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use datafusion::logical_plan::{provider_as_source, ExprRewritable, LogicalPlanBuilder}; +use datafusion::{ + datasource::provider_as_source, + logical_expr::{expr_rewriter::ExprRewritable, LogicalPlanBuilder}, +}; use observability_deps::tracing::trace; use predicate::Predicate; use schema::{sort::SortKey, Schema}; diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index df39608611..864ec64fa3 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -16,10 +16,10 @@ use crate::{ use arrow::datatypes::DataType; use data_types::ChunkId; use datafusion::{ + common::DFSchemaRef, error::DataFusionError, - logical_expr::utils::exprlist_to_columns, - logical_plan::{col, when, DFSchemaRef, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, - prelude::Column, + logical_expr::{utils::exprlist_to_columns, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, + prelude::{col, when, Column, Expr}, }; use datafusion_util::AsExpr; use futures::{Stream, StreamExt, TryStreamExt}; @@ -1886,7 +1886,7 @@ fn cheap_chunk_first(mut chunks: Vec>) -> Vec; diff --git a/iox_query/src/plan/seriesset.rs b/iox_query/src/plan/seriesset.rs index 15b7641e7e..a158438098 100644 --- a/iox_query/src/plan/seriesset.rs +++ b/iox_query/src/plan/seriesset.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use datafusion::logical_plan::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; use crate::exec::field::FieldColumns; diff --git a/iox_query/src/plan/stringset.rs b/iox_query/src/plan/stringset.rs index 35c764daf8..87e8510a35 100644 --- a/iox_query/src/plan/stringset.rs +++ b/iox_query/src/plan/stringset.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow_util::util::str_iter_to_batch; -use datafusion::logical_plan::LogicalPlan; +use datafusion::logical_expr::LogicalPlan; /// The name of the column containing table names returned by a call to /// `table_names`. diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index d09534c1f7..e97eaa3d87 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -10,7 +10,6 @@ use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, execution::context::SessionState, logical_expr::{TableProviderFilterPushDown, TableType}, - logical_plan::Expr, physical_plan::{ expressions::{col as physical_col, PhysicalSortExpr}, filter::FilterExec, @@ -19,6 +18,7 @@ use datafusion::{ union::UnionExec, ExecutionPlan, }, + prelude::Expr, }; use observability_deps::tracing::{debug, trace, warn}; use predicate::Predicate; diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index 58f9440af8..66bba3d3af 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -9,8 +9,8 @@ use arrow::{ }; use data_types::{StatValues, Statistics, TableSummary}; use datafusion::{ - logical_plan::Column, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + prelude::Column, }; use observability_deps::tracing::{debug, trace, warn}; use predicate::Predicate; @@ -250,7 +250,7 @@ fn get_aggregate(stats: StatValues, aggregate: Aggregate) -> Option { mod test { use std::sync::Arc; - use datafusion::logical_plan::{col, lit}; + use datafusion::prelude::{col, lit}; use datafusion_util::lit_dict; use predicate::Predicate; use schema::merge::SchemaMerger; diff --git a/iox_query/src/util.rs b/iox_query/src/util.rs index 5e5f7592ca..457f0233b5 100644 --- a/iox_query/src/util.rs +++ b/iox_query/src/util.rs @@ -16,19 +16,18 @@ use arrow::{ use data_types::TimestampMinMax; use datafusion::{ self, - datasource::MemTable, + common::{DFSchema, ToDFSchema}, + datasource::{provider_as_source, MemTable}, error::{DataFusionError, Result as DatafusionResult}, execution::context::ExecutionProps, - logical_plan::{ - lit, provider_as_source, DFSchema, Expr, ExprRewriter, ExprSchemable, LogicalPlan, - LogicalPlanBuilder, ToDFSchema, - }, + logical_expr::{expr_rewriter::ExprRewriter, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, optimizer::expr_simplifier::{ExprSimplifier, SimplifyContext}, physical_expr::create_physical_expr, physical_plan::{ expressions::{col as physical_col, PhysicalSortExpr}, ExecutionPlan, PhysicalExpr, }, + prelude::{lit, Expr}, scalar::ScalarValue, }; @@ -278,7 +277,8 @@ pub fn compute_timenanosecond_min_max_for_one_record_batch( mod tests { use arrow::datatypes::DataType; use datafusion::{ - logical_plan::{col, lit, ExprRewritable}, + logical_expr::expr_rewriter::ExprRewritable, + prelude::{col, lit}, scalar::ScalarValue, }; use schema::builder::SchemaBuilder; diff --git a/predicate/src/delete_expr.rs b/predicate/src/delete_expr.rs index 3a751aabfa..af155bebdf 100644 --- a/predicate/src/delete_expr.rs +++ b/predicate/src/delete_expr.rs @@ -1,11 +1,10 @@ use data_types::{DeleteExpr, Op, Scalar}; +use datafusion::prelude::Expr; use snafu::{ResultExt, Snafu}; use std::ops::Deref; -pub(crate) fn expr_to_df(expr: DeleteExpr) -> datafusion::logical_plan::Expr { - use datafusion::logical_plan::Expr; - - let column = datafusion::logical_plan::Column { +pub(crate) fn expr_to_df(expr: DeleteExpr) -> Expr { + let column = datafusion::prelude::Column { relation: None, name: expr.column, }; @@ -20,15 +19,10 @@ pub(crate) fn expr_to_df(expr: DeleteExpr) -> datafusion::logical_plan::Expr { #[derive(Debug, Snafu)] pub enum DataFusionToExprError { #[snafu(display("unsupported expression: {:?}", expr))] - UnsupportedExpression { - expr: datafusion::logical_plan::Expr, - }, + UnsupportedExpression { expr: Expr }, #[snafu(display("unsupported operants: left {:?}; right {:?}", left, right))] - UnsupportedOperants { - left: datafusion::logical_plan::Expr, - right: datafusion::logical_plan::Expr, - }, + UnsupportedOperants { left: Expr, right: Expr }, #[snafu(display("cannot convert datafusion operator: {}", source))] CannotConvertDataFusionOperator { @@ -41,18 +35,13 @@ pub enum DataFusionToExprError { }, } -pub(crate) fn df_to_expr( - expr: datafusion::logical_plan::Expr, -) -> Result { +pub(crate) fn df_to_expr(expr: Expr) -> Result { match expr { - datafusion::logical_plan::Expr::BinaryExpr { left, op, right } => { + Expr::BinaryExpr { left, op, right } => { let (column, scalar) = match (left.deref(), right.deref()) { // The delete predicate parser currently only supports ``, not ``, // however this could can easily be extended to support the latter case as well. - ( - datafusion::logical_plan::Expr::Column(column), - datafusion::logical_plan::Expr::Literal(value), - ) => { + (Expr::Column(column), Expr::Literal(value)) => { let column = column.name.clone(); let scalar = df_to_scalar(value.clone()) @@ -76,10 +65,10 @@ pub(crate) fn df_to_expr( } } -pub(crate) fn op_to_df(op: Op) -> datafusion::logical_plan::Operator { +pub(crate) fn op_to_df(op: Op) -> datafusion::logical_expr::Operator { match op { - Op::Eq => datafusion::logical_plan::Operator::Eq, - Op::Ne => datafusion::logical_plan::Operator::NotEq, + Op::Eq => datafusion::logical_expr::Operator::Eq, + Op::Ne => datafusion::logical_expr::Operator::NotEq, } } @@ -88,14 +77,14 @@ pub(crate) fn op_to_df(op: Op) -> datafusion::logical_plan::Operator { pub enum DataFusionToOpError { #[snafu(display("unsupported operator: {:?}", op))] UnsupportedOperator { - op: datafusion::logical_plan::Operator, + op: datafusion::logical_expr::Operator, }, } -pub(crate) fn df_to_op(op: datafusion::logical_plan::Operator) -> Result { +pub(crate) fn df_to_op(op: datafusion::logical_expr::Operator) -> Result { match op { - datafusion::logical_plan::Operator::Eq => Ok(Op::Eq), - datafusion::logical_plan::Operator::NotEq => Ok(Op::Ne), + datafusion::logical_expr::Operator::Eq => Ok(Op::Eq), + datafusion::logical_expr::Operator::NotEq => Ok(Op::Ne), other => Err(DataFusionToOpError::UnsupportedOperator { op: other }), } } @@ -184,40 +173,32 @@ mod tests { #[test] fn test_unsupported_expression() { - let expr = datafusion::logical_plan::Expr::Not(Box::new( - datafusion::logical_plan::Expr::BinaryExpr { - left: Box::new(datafusion::logical_plan::Expr::Column( - datafusion::logical_plan::Column { - relation: None, - name: "foo".to_string(), - }, - )), - op: datafusion::logical_plan::Operator::Eq, - right: Box::new(datafusion::logical_plan::Expr::Literal( - datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())), - )), - }, - )); + let expr = Expr::Not(Box::new(Expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion::prelude::Column { + relation: None, + name: "foo".to_string(), + })), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( + "x".to_string(), + )))), + })); let res = df_to_expr(expr); assert_contains!(res.unwrap_err().to_string(), "unsupported expression:"); } #[test] fn test_unsupported_operants() { - let expr = datafusion::logical_plan::Expr::BinaryExpr { - left: Box::new(datafusion::logical_plan::Expr::Column( - datafusion::logical_plan::Column { - relation: None, - name: "foo".to_string(), - }, - )), - op: datafusion::logical_plan::Operator::Eq, - right: Box::new(datafusion::logical_plan::Expr::Column( - datafusion::logical_plan::Column { - relation: None, - name: "bar".to_string(), - }, - )), + let expr = Expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion::prelude::Column { + relation: None, + name: "foo".to_string(), + })), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(Expr::Column(datafusion::prelude::Column { + relation: None, + name: "bar".to_string(), + })), }; let res = df_to_expr(expr); assert_contains!(res.unwrap_err().to_string(), "unsupported operants:"); @@ -239,24 +220,20 @@ mod tests { #[test] fn test_unsupported_scalar_value_in_expr() { - let expr = datafusion::logical_plan::Expr::BinaryExpr { - left: Box::new(datafusion::logical_plan::Expr::Column( - datafusion::logical_plan::Column { - relation: None, - name: "foo".to_string(), - }, - )), - op: datafusion::logical_plan::Operator::Eq, - right: Box::new(datafusion::logical_plan::Expr::Literal( - datafusion::scalar::ScalarValue::List( - Some(vec![]), - Box::new(Field::new( - "field", - arrow::datatypes::DataType::Float64, - true, - )), - ), - )), + let expr = Expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion::prelude::Column { + relation: None, + name: "foo".to_string(), + })), + op: datafusion::logical_expr::Operator::Eq, + right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::List( + Some(vec![]), + Box::new(Field::new( + "field", + arrow::datatypes::DataType::Float64, + true, + )), + ))), }; let res = df_to_expr(expr); assert_contains!(res.unwrap_err().to_string(), "unsupported scalar value:"); @@ -264,23 +241,21 @@ mod tests { #[test] fn test_unsupported_operator() { - let res = df_to_op(datafusion::logical_plan::Operator::Like); + let res = df_to_op(datafusion::logical_expr::Operator::Like); assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); } #[test] fn test_unsupported_operator_in_expr() { - let expr = datafusion::logical_plan::Expr::BinaryExpr { - left: Box::new(datafusion::logical_plan::Expr::Column( - datafusion::logical_plan::Column { - relation: None, - name: "foo".to_string(), - }, - )), - op: datafusion::logical_plan::Operator::Like, - right: Box::new(datafusion::logical_plan::Expr::Literal( - datafusion::scalar::ScalarValue::Utf8(Some("x".to_string())), - )), + let expr = Expr::BinaryExpr { + left: Box::new(Expr::Column(datafusion::prelude::Column { + relation: None, + name: "foo".to_string(), + })), + op: datafusion::logical_expr::Operator::Like, + right: Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Utf8(Some( + "x".to_string(), + )))), }; let res = df_to_expr(expr); assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 5e64a47ff3..b4523f273f 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -1,7 +1,8 @@ use crate::delete_expr::{df_to_expr, expr_to_df}; use chrono::DateTime; use data_types::{DeleteExpr, DeletePredicate, TimestampRange, Tombstone}; -use datafusion::logical_plan::{lit, Column, Expr, Operator}; +use datafusion::logical_expr::Operator; +use datafusion::prelude::{lit, Column, Expr}; use snafu::Snafu; use sqlparser::{ ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index 789e0be570..68b40ee875 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -23,10 +23,10 @@ use arrow::{ use data_types::{InfluxDbType, TableSummary, TimestampRange}; use datafusion::{ error::DataFusionError, - logical_expr::{binary_expr, utils::expr_to_columns}, - logical_plan::{col, lit_timestamp_nano, Expr, Operator}, + logical_expr::{binary_expr, utils::expr_to_columns, Operator}, optimizer::utils::split_conjunction, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + prelude::{col, lit_timestamp_nano, Expr}, }; use datafusion_util::{make_range_expr, nullable_schema}; use observability_deps::tracing::debug; @@ -59,7 +59,7 @@ pub const EMPTY_PREDICATE: Predicate = Predicate { /// Example: /// ``` /// use predicate::Predicate; -/// use datafusion::logical_plan::{col, lit}; +/// use datafusion::prelude::{col, lit}; /// /// let p = Predicate::new() /// .with_range(1, 100) @@ -301,10 +301,7 @@ struct SummaryWrapper<'a> { } impl<'a> PruningStatistics for SummaryWrapper<'a> { - fn min_values( - &self, - column: &datafusion::logical_plan::Column, - ) -> Option { + fn min_values(&self, column: &datafusion::prelude::Column) -> Option { let col = self.summary.column(&column.name)?; let stats = &col.stats; @@ -327,10 +324,7 @@ impl<'a> PruningStatistics for SummaryWrapper<'a> { Some(array) } - fn max_values( - &self, - column: &datafusion::logical_plan::Column, - ) -> Option { + fn max_values(&self, column: &datafusion::prelude::Column) -> Option { let col = self.summary.column(&column.name)?; let stats = &col.stats; @@ -358,10 +352,7 @@ impl<'a> PruningStatistics for SummaryWrapper<'a> { 1 } - fn null_counts( - &self, - column: &datafusion::logical_plan::Column, - ) -> Option { + fn null_counts(&self, column: &datafusion::prelude::Column) -> Option { let null_count = self.summary.column(&column.name)?.stats.null_count(); Some(Arc::new(UInt64Array::from(vec![null_count]))) @@ -606,7 +597,7 @@ mod tests { use super::*; use arrow::datatypes::DataType as ArrowDataType; use data_types::{ColumnSummary, InfluxDbType, StatValues, MAX_NANO_TIME, MIN_NANO_TIME}; - use datafusion::logical_plan::{col, lit}; + use datafusion::prelude::{col, lit}; use schema::builder::SchemaBuilder; use test_helpers::maybe_start_logging; diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index 792396e92c..40ce9573f0 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -6,13 +6,13 @@ mod value_rewrite; use crate::Predicate; +use datafusion::common::{ExprSchema, ToDFSchema}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::execution::context::ExecutionProps; -use datafusion::logical_expr::lit; -use datafusion::logical_plan::{ - Column, Expr, ExprRewritable, ExprSchema, ExprSchemable, SimplifyInfo, ToDFSchema, -}; -use datafusion::optimizer::expr_simplifier::ExprSimplifier; +use datafusion::logical_expr::expr_rewriter::ExprRewritable; +use datafusion::logical_expr::ExprSchemable; +use datafusion::optimizer::expr_simplifier::{ExprSimplifier, SimplifyInfo}; +use datafusion::prelude::{lit, Column, Expr}; use observability_deps::tracing::{debug, trace}; use schema::Schema; use std::collections::BTreeSet; @@ -324,7 +324,7 @@ mod tests { use super::*; use arrow::datatypes::DataType; use datafusion::{ - logical_plan::{col, lit}, + prelude::{col, lit}, scalar::ScalarValue, }; use datafusion_util::lit_dict; diff --git a/predicate/src/rpc_predicate/column_rewrite.rs b/predicate/src/rpc_predicate/column_rewrite.rs index 7a29331fca..a6c8d55f3f 100644 --- a/predicate/src/rpc_predicate/column_rewrite.rs +++ b/predicate/src/rpc_predicate/column_rewrite.rs @@ -1,7 +1,8 @@ use std::sync::Arc; use datafusion::{ - error::Result as DataFusionResult, logical_plan::ExprRewriter, prelude::*, scalar::ScalarValue, + error::Result as DataFusionResult, logical_expr::expr_rewriter::ExprRewriter, prelude::*, + scalar::ScalarValue, }; use schema::Schema; @@ -46,7 +47,7 @@ impl ExprRewriter for MissingColumnRewriter { #[cfg(test)] mod tests { - use datafusion::{arrow::datatypes::DataType, logical_plan::ExprRewritable}; + use datafusion::{arrow::datatypes::DataType, logical_expr::expr_rewriter::ExprRewritable}; use schema::SchemaBuilder; use super::*; diff --git a/predicate/src/rpc_predicate/field_rewrite.rs b/predicate/src/rpc_predicate/field_rewrite.rs index 7483f2bc6f..69434eca14 100644 --- a/predicate/src/rpc_predicate/field_rewrite.rs +++ b/predicate/src/rpc_predicate/field_rewrite.rs @@ -4,11 +4,13 @@ use super::FIELD_COLUMN_NAME; use arrow::array::{as_boolean_array, as_string_array, ArrayRef, StringArray}; use arrow::compute::kernels; use arrow::record_batch::RecordBatch; +use datafusion::common::DFSchema; use datafusion::error::{DataFusionError, Result as DataFusionResult}; -use datafusion::logical_plan::{lit, DFSchema, Expr, ExprVisitable, ExpressionVisitor, Recursion}; +use datafusion::logical_expr::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_expr::execution_props::ExecutionProps; use datafusion::physical_plan::ColumnarValue; +use datafusion::prelude::{lit, Expr}; use datafusion_util::disassemble_conjuct; use schema::Schema; use std::sync::Arc; @@ -245,7 +247,7 @@ impl ExpressionVisitor for ColumnReferencesFinder { mod tests { use super::*; use arrow::datatypes::DataType; - use datafusion::logical_plan::{case, col}; + use datafusion::prelude::{case, col}; use schema::builder::SchemaBuilder; use test_helpers::assert_contains; diff --git a/predicate/src/rpc_predicate/measurement_rewrite.rs b/predicate/src/rpc_predicate/measurement_rewrite.rs index 0fea57b556..e5fcf2a0a6 100644 --- a/predicate/src/rpc_predicate/measurement_rewrite.rs +++ b/predicate/src/rpc_predicate/measurement_rewrite.rs @@ -1,5 +1,6 @@ use datafusion::error::Result as DataFusionResult; -use datafusion::logical_plan::{lit, Column, Expr, ExprRewritable, ExprRewriter}; +use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion::prelude::{lit, Column, Expr}; use super::MEASUREMENT_COLUMN_NAME; diff --git a/predicate/src/rpc_predicate/rewrite.rs b/predicate/src/rpc_predicate/rewrite.rs index a5d54dceaf..b3facabcea 100644 --- a/predicate/src/rpc_predicate/rewrite.rs +++ b/predicate/src/rpc_predicate/rewrite.rs @@ -1,7 +1,12 @@ use datafusion::{ error::Result, - logical_expr::{binary_expr, expr::Case}, - logical_plan::{Expr, ExprRewritable, ExprRewriter, Operator}, + logical_expr::{ + binary_expr, + expr::Case, + expr_rewriter::{ExprRewritable, ExprRewriter}, + Operator, + }, + prelude::Expr, }; /// Special purpose `Expr` rewrite rules for IOx @@ -279,7 +284,7 @@ mod tests { use std::ops::Add; use super::*; - use datafusion::logical_plan::{case, col, lit, when}; + use datafusion::prelude::{case, col, lit, when}; #[test] fn test_fold_case_expr() { diff --git a/predicate/src/rpc_predicate/value_rewrite.rs b/predicate/src/rpc_predicate/value_rewrite.rs index f9b49ac97c..ea2bc6b582 100644 --- a/predicate/src/rpc_predicate/value_rewrite.rs +++ b/predicate/src/rpc_predicate/value_rewrite.rs @@ -1,5 +1,6 @@ use datafusion::error::Result as DataFusionResult; -use datafusion::logical_plan::{lit, Expr, ExprRewritable, ExprRewriter}; +use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; +use datafusion::prelude::{lit, Expr}; use crate::ValueExpr; @@ -38,7 +39,7 @@ mod tests { use super::*; use crate::rpc_predicate::VALUE_COLUMN_NAME; - use datafusion::logical_plan::col; + use datafusion::prelude::col; #[test] fn test_field_value_rewriter() { diff --git a/querier/src/system_tables/mod.rs b/querier/src/system_tables/mod.rs index 5acd67ddb2..55faed8429 100644 --- a/querier/src/system_tables/mod.rs +++ b/querier/src/system_tables/mod.rs @@ -12,6 +12,7 @@ use datafusion::{ expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }, + prelude::Expr, }; use std::{ any::Any, @@ -102,7 +103,7 @@ where _ctx: &SessionState, projection: &Option>, // It would be cool to push projection and limit down - _filters: &[datafusion::logical_plan::Expr], + _filters: &[Expr], _limit: Option, ) -> DataFusionResult> { let schema = self.table.schema(); diff --git a/querier/src/table/query_access/mod.rs b/querier/src/table/query_access/mod.rs index e16830577b..3642ed35d8 100644 --- a/querier/src/table/query_access/mod.rs +++ b/querier/src/table/query_access/mod.rs @@ -7,8 +7,8 @@ use datafusion::{ error::DataFusionError, execution::context::SessionState, logical_expr::TableProviderFilterPushDown, - logical_plan::Expr, physical_plan::ExecutionPlan, + prelude::Expr, }; use iox_query::{ exec::{ExecutorType, SessionContextIOxExt}, diff --git a/query_functions/src/group_by.rs b/query_functions/src/group_by.rs index d7ce99b3c3..ed16dddfbc 100644 --- a/query_functions/src/group_by.rs +++ b/query_functions/src/group_by.rs @@ -2,7 +2,7 @@ //! and Aggregate functions in IOx, designed to be compatible with //! InfluxDB classic -use datafusion::logical_plan::Expr; +use datafusion::prelude::Expr; use snafu::Snafu; use crate::window; @@ -81,7 +81,7 @@ pub enum WindowDuration { impl Aggregate { /// Create the appropriate DataFusion expression for this aggregate pub fn to_datafusion_expr(self, input: Expr) -> Result { - use datafusion::logical_plan::{avg, count, max, min, sum}; + use datafusion::prelude::{avg, count, max, min, sum}; match self { Self::Sum => Ok(sum(input)), Self::Count => Ok(count(input)), diff --git a/query_functions/src/lib.rs b/query_functions/src/lib.rs index 75d2d3eea3..bc8de9fff3 100644 --- a/query_functions/src/lib.rs +++ b/query_functions/src/lib.rs @@ -12,8 +12,8 @@ )] use datafusion::{ - logical_plan::{Expr, FunctionRegistry}, - prelude::lit, + execution::FunctionRegistry, + prelude::{lit, Expr}, }; use group_by::WindowDuration; use window::EncodedWindowDuration; @@ -97,7 +97,7 @@ mod test { array::{ArrayRef, StringArray, TimestampNanosecondArray}, record_batch::RecordBatch, }; - use datafusion::{assert_batches_eq, logical_plan::col}; + use datafusion::{assert_batches_eq, prelude::col}; use datafusion_util::context_with_table; use std::sync::Arc; diff --git a/query_functions/src/regex.rs b/query_functions/src/regex.rs index dc43df1a69..87a7376638 100644 --- a/query_functions/src/regex.rs +++ b/query_functions/src/regex.rs @@ -7,8 +7,8 @@ use arrow::{ use datafusion::{ error::DataFusionError, logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility}, - logical_plan::create_udf, physical_plan::ColumnarValue, + prelude::create_udf, scalar::ScalarValue, }; use once_cell::sync::Lazy; @@ -206,8 +206,7 @@ mod test { }; use datafusion::{ error::DataFusionError, - logical_plan::{col, Expr}, - prelude::lit, + prelude::{col, lit, Expr}, }; use datafusion_util::context_with_table; use std::sync::Arc; diff --git a/query_functions/src/registry.rs b/query_functions/src/registry.rs index 8185c0665e..f5ade0b986 100644 --- a/query_functions/src/registry.rs +++ b/query_functions/src/registry.rs @@ -2,8 +2,8 @@ use std::{collections::HashSet, sync::Arc}; use datafusion::{ common::{DataFusionError, Result as DataFusionResult}, + execution::FunctionRegistry, logical_expr::{AggregateUDF, ScalarUDF}, - logical_plan::FunctionRegistry, }; use once_cell::sync::Lazy; diff --git a/query_functions/src/selectors.rs b/query_functions/src/selectors.rs index 317235e7d4..1d788c1bf5 100644 --- a/query_functions/src/selectors.rs +++ b/query_functions/src/selectors.rs @@ -571,7 +571,7 @@ mod test { record_batch::RecordBatch, util::pretty::pretty_format_batches, }; - use datafusion::{datasource::MemTable, logical_plan::Expr, prelude::*}; + use datafusion::{datasource::MemTable, prelude::*}; use schema::TIME_DATA_TIMEZONE; use super::*; diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 3c420193ce..09faa6baff 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -135,21 +135,21 @@ | 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence | +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000; -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64) AS restaurant.count > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64) AS restaurant.count < Int64(40000) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) AS restaurant.count > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) AS restaurant.count < Int64(40000)] | -| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence AND CAST(count@0 AS Int64) < 40000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64) < Int64(40000) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) < Int64(40000)] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Int64) > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence AND CAST(count@0 AS Int64) < 40000 | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200 and count < 40000; -- Results After Sorting +-------+--------+--------------------------------+-----------+ @@ -162,21 +162,21 @@ | 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence | +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and count < 40000; -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64) AS restaurant.count > Int64(200) AND CAST(restaurant.count AS Int64) AS restaurant.count < Int64(40000) | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) AS restaurant.count > Int64(200), CAST(restaurant.count AS Int64) AS restaurant.count < Int64(40000)] | -| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64) > Int64(200) AND CAST(restaurant.count AS Int64) < Int64(40000) | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), CAST(restaurant.count AS Int64) < Int64(40000)] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(count@0 AS Int64) > 200 AND CAST(count@0 AS Int64) < 40000 | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0; -- Results After Sorting +-------+--------+--------------------------------+-----------+ diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index 8d7339dafc..1c86ce12bc 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -1,7 +1,7 @@ use crate::scenarios::*; use arrow::datatypes::DataType; use data_types::{MAX_NANO_TIME, MIN_NANO_TIME}; -use datafusion::logical_plan::{col, lit}; +use datafusion::prelude::{col, lit}; use iox_query::{ exec::fieldlist::{Field, FieldList}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 7f32084f46..f0bc77d64c 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -14,7 +14,7 @@ use crate::{ }, }; use datafusion::{ - logical_plan::{col, lit}, + prelude::{col, lit}, scalar::ScalarValue, }; use iox_query::frontend::influxrpc::InfluxRpcPlanner; diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index c7f23c3cd1..4d7802bc33 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -1,7 +1,7 @@ //! Tests for the Influx gRPC queries use crate::scenarios::*; use data_types::{MAX_NANO_TIME, MIN_NANO_TIME}; -use datafusion::logical_plan::{col, lit}; +use datafusion::prelude::{col, lit}; use iox_query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index a15672fde0..d98deda96e 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -1,6 +1,6 @@ use crate::scenarios::*; use data_types::{MAX_NANO_TIME, MIN_NANO_TIME}; -use datafusion::logical_plan::{col, lit}; +use datafusion::prelude::{col, lit}; use iox_query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index 0e9e2c532a..ffd85407f6 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -1,4 +1,4 @@ -use datafusion::logical_plan::{col, lit}; +use datafusion::prelude::{col, lit}; use iox_query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/util.rs b/query_tests/src/influxrpc/util.rs index 9d41e5e122..d91808d143 100644 --- a/query_tests/src/influxrpc/util.rs +++ b/query_tests/src/influxrpc/util.rs @@ -1,5 +1,5 @@ use datafusion::error::DataFusionError; -use datafusion::logical_plan::{col, lit, when, Expr}; +use datafusion::prelude::{col, lit, when, Expr}; use iox_query::exec::IOxSessionContext; use iox_query::plan::seriesset::SeriesSetPlans; diff --git a/read_buffer/src/column/cmp.rs b/read_buffer/src/column/cmp.rs index 1fd5ad616f..c967053510 100644 --- a/read_buffer/src/column/cmp.rs +++ b/read_buffer/src/column/cmp.rs @@ -45,17 +45,17 @@ impl TryFrom<&str> for Operator { } } -impl TryFrom<&datafusion::logical_plan::Operator> for Operator { +impl TryFrom<&datafusion::logical_expr::Operator> for Operator { type Error = String; - fn try_from(op: &datafusion::logical_plan::Operator) -> Result { + fn try_from(op: &datafusion::logical_expr::Operator) -> Result { match op { - datafusion::logical_plan::Operator::Eq => Ok(Self::Equal), - datafusion::logical_plan::Operator::NotEq => Ok(Self::NotEqual), - datafusion::logical_plan::Operator::Lt => Ok(Self::LT), - datafusion::logical_plan::Operator::LtEq => Ok(Self::LTE), - datafusion::logical_plan::Operator::Gt => Ok(Self::GT), - datafusion::logical_plan::Operator::GtEq => Ok(Self::GTE), + datafusion::logical_expr::Operator::Eq => Ok(Self::Equal), + datafusion::logical_expr::Operator::NotEq => Ok(Self::NotEqual), + datafusion::logical_expr::Operator::Lt => Ok(Self::LT), + datafusion::logical_expr::Operator::LtEq => Ok(Self::LTE), + datafusion::logical_expr::Operator::Gt => Ok(Self::GT), + datafusion::logical_expr::Operator::GtEq => Ok(Self::GTE), v => Err(format!("unsupported operator {:?}", v)), } } diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index f49210bddc..9aa8b1a172 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -26,7 +26,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion::{ - logical_plan::Expr as DfExpr, logical_plan::Operator as DFOperator, + logical_expr::Operator as DFOperator, prelude::Expr as DfExpr, scalar::ScalarValue as DFScalarValue, }; use std::num::NonZeroU64; @@ -4033,7 +4033,7 @@ west,host-c,pro,10,6 assert_eq!(result, to_map(vec![])); } - use datafusion::logical_plan::{col, lit, lit_timestamp_nano}; + use datafusion::prelude::{col, lit, lit_timestamp_nano}; use std::convert::TryFrom; #[test] diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index 1a7dc0b602..130e1ef7b3 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -10,13 +10,8 @@ use std::string::FromUtf8Error; use std::{convert::TryFrom, fmt}; use datafusion::error::DataFusionError; -use datafusion::logical_expr::binary_expr; -use datafusion::logical_plan::when; -use datafusion::{ - logical_plan::{Expr, Operator}, - prelude::*, - scalar::ScalarValue, -}; +use datafusion::logical_expr::{binary_expr, Operator}; +use datafusion::{prelude::*, scalar::ScalarValue}; use generated_types::{ aggregate::AggregateType as RPCAggregateType, node::Comparison as RPCComparison, node::Logical as RPCLogical, node::Value as RPCValue, read_group_request::Group as RPCGroup, diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 3989a75c4c..caca45feca 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1611,7 +1611,7 @@ where mod tests { use super::*; use data_types::ChunkId; - use datafusion::logical_plan::{col, Expr}; + use datafusion::prelude::{col, Expr}; use datafusion_util::lit_dict; use futures::Future; use generated_types::{i_ox_testing_client::IOxTestingClient, tag_key_predicate::Value};