chore: Update datafusion (#8515)

* chore: Update datafusion

* fix: update for API

* fix: Verify unsupported statements, with tests

* fix: update tests

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-08-21 13:49:21 -04:00 committed by GitHub
parent 3e98f7ea5c
commit 967aef0e9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 151 additions and 139 deletions

32
Cargo.lock generated
View File

@ -1389,8 +1389,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"ahash",
"arrow",
@ -1437,8 +1437,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"arrow",
"arrow-array",
@ -1451,8 +1451,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"arrow",
"dashmap",
@ -1470,8 +1470,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"ahash",
"arrow",
@ -1484,8 +1484,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"arrow",
"async-trait",
@ -1501,8 +1501,8 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"ahash",
"arrow",
@ -1535,8 +1535,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"arrow",
"chrono",
@ -1549,8 +1549,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7a5354fe5908b8ac7db163d6c484dbf1d85a142e#7a5354fe5908b8ac7db163d6c484dbf1d85a142e"
version = "29.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673#f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673"
dependencies = [
"arrow",
"arrow-schema",

View File

@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "45.0.0" }
arrow-flight = { version = "45.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "7a5354fe5908b8ac7db163d6c484dbf1d85a142e", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "7a5354fe5908b8ac7db163d6c484dbf1d85a142e" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" }
hashbrown = { version = "0.14.0" }
object_store = { version = "0.6.0" }

View File

@ -349,11 +349,7 @@ async fn query_error_handling() {
.arg("drop table this_table_doesnt_exist")
.assert()
.failure()
.stderr(predicate::str::contains(
"Error while planning query: \
This feature is not implemented: \
Unsupported logical plan: DropTable",
));
.stderr(predicate::str::contains("DDL not supported: DropTable"));
}
.boxed()
})),

View File

@ -532,58 +532,6 @@ async fn issue_4631_b() {
.await
}
#[tokio::test]
async fn unsupported_sql_returns_error() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
fn make_error_message(name: &str) -> String {
format!("Error while planning query: This feature is not implemented: Unsupported logical plan: {name}")
}
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol("this_table_does_exist,tag=A val=\"foo\" 1".into()),
Step::QueryExpectingError {
sql: "drop table this_table_does_exist".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DropTable"),
},
Step::QueryExpectingError {
sql: "create view some_view as select * from this_table_does_exist".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("CreateView"),
},
Step::QueryExpectingError {
sql: "drop view some_view".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DropView"),
},
Step::QueryExpectingError {
sql: "create database my_new_database".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("CreateCatalog"),
},
Step::QueryExpectingError {
sql: "create schema foo".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("CreateCatalogSchema"),
},
Step::QueryExpectingError {
sql: "create external table foo stored as csv location '/etc/hosts'".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("CreateExternalTable"),
},
],
)
.run()
.await
}
#[tokio::test]
async fn table_or_namespace_not_found() {
test_helpers::maybe_start_logging();

View File

@ -29,41 +29,6 @@ async fn schema_merge_nonexistent_column() {
.await;
}
#[tokio::test]
async fn create_external_table() {
// Datafusion supports `CREATE EXTERNAL TABLE`, but IOx should not (as that would be a security
// hole)
SqlErrorTest {
// This test doesn't actually depend on any particular data, but to get to the error, the
// namespace needs to exist.
setup_name: "OneMeasurementWithTags",
sql: "CREATE EXTERNAL TABLE foo(ts TIMESTAMP) STORED AS CSV LOCATION '/tmp/foo.csv'",
expected_error_code: tonic::Code::InvalidArgument,
expected_message: "Error while planning query: This feature is not implemented: \
Unsupported logical plan: CreateExternalTable",
}
.run()
.await;
}
#[tokio::test]
async fn create_schema() {
// Datafusion supports `CREATE SCHEMA`, but IOx should not (as that would be a security
// hole)
SqlErrorTest {
// This test doesn't actually depend on any particular data, but to get to the error, the
// namespace needs to exist.
setup_name: "OneMeasurementWithTags",
sql: "CREATE SCHEMA foo",
expected_error_code: tonic::Code::InvalidArgument,
expected_message: "Error while planning query: \
This feature is not implemented: \
Unsupported logical plan: CreateCatalogSchema",
}
.run()
.await;
}
#[tokio::test]
async fn bad_selector_num_args() {
SqlErrorTest {
@ -124,3 +89,76 @@ impl SqlErrorTest {
.await;
}
}
#[tokio::test]
async fn unsupported_sql_returns_error() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
fn make_error_message(op_type: &str, name: &str) -> String {
format!(
"Error while planning query: Error during planning: {op_type} not supported: {name}"
)
}
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol("this_table_does_exist,tag=A val=\"foo\" 1".into()),
// DDL (creating catalogs, etc)
Step::QueryExpectingError {
sql: "drop table this_table_does_exist".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "DropTable"),
},
Step::QueryExpectingError {
sql: "create view some_view as select * from this_table_does_exist".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "CreateView"),
},
Step::QueryExpectingError {
sql: "drop view some_view".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "DropView"),
},
Step::QueryExpectingError {
sql: "create database my_new_database".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "CreateCatalog"),
},
Step::QueryExpectingError {
sql: "create schema foo".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "CreateCatalogSchema"),
},
Step::QueryExpectingError {
sql: "create external table foo stored as csv location '/etc/hosts'".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DDL", "CreateExternalTable"),
},
// DML (insert/copy)
Step::QueryExpectingError {
sql: "set max_parquet_fanout = 4".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("Statement", "SetVariable"),
},
// DML (insert/copy)
Step::QueryExpectingError {
sql: "insert into this_table_does_exist values ('foo', 1, now())".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DML", "Insert Into"),
},
// DML COPY
Step::QueryExpectingError {
sql: "copy (select 1) to '/tmp/foo.parquet'".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message: make_error_message("DML", "COPY"),
},
],
)
.run()
.await
}

View File

@ -34,6 +34,10 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::{
catalog::CatalogProvider,
common::{
plan_err,
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
},
execution::{
context::{QueryPlanner, SessionState, TaskContext},
memory_pool::MemoryPool,
@ -59,7 +63,7 @@ use trace::{
};
// Reuse DataFusion error and Result types for this module
pub use datafusion::error::{DataFusionError as Error, Result};
pub use datafusion::error::{DataFusionError, Result};
/// This structure implements the DataFusion notion of "query planner"
/// and is needed to create plans with the IOx extension nodes.
@ -343,7 +347,10 @@ impl IOxSessionContext {
let ctx = self.child_ctx("sql_to_logical_plan");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
ctx.inner.state().create_logical_plan(sql).await
let plan = ctx.inner.state().create_logical_plan(sql).await?;
// TODO use better API: https://github.com/apache/arrow-datafusion/issues/7328
verify_plan(&plan)?;
Ok(plan)
}
/// Create a logical plan that reads a single [`RecordBatch`]. Use
@ -500,7 +507,7 @@ impl IOxSessionContext {
})
.await?;
Ok::<_, Error>(CrossRtStream::new_with_df_error_stream(stream, exec))
Ok::<_, DataFusionError>(CrossRtStream::new_with_df_error_stream(stream, exec))
}
})
.try_flatten()
@ -515,9 +522,10 @@ impl IOxSessionContext {
return Ok(None);
}
let series: Vec<Series> = series_set
.try_into_series(points_per_batch)
.map_err(|e| Error::Execution(format!("Error converting to series: {e}")))?;
let series: Vec<Series> =
series_set.try_into_series(points_per_batch).map_err(|e| {
DataFusionError::Execution(format!("Error converting to series: {e}"))
})?;
Ok(Some(futures::stream::iter(series).map(Ok)))
})
.try_flatten();
@ -553,9 +561,9 @@ impl IOxSessionContext {
.await?
.into_fieldlist()
.map_err(|e| {
Error::Context(
DataFusionError::Context(
"Error converting to field list".to_string(),
Box::new(Error::External(Box::new(e))),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
@ -580,9 +588,9 @@ impl IOxSessionContext {
// TODO: Stream this
results.into_fieldlist().map_err(|e| {
Error::Context(
DataFusionError::Context(
"Error converting to field list".to_string(),
Box::new(Error::External(Box::new(e))),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}
@ -598,9 +606,9 @@ impl IOxSessionContext {
.await?
.into_stringset()
.map_err(|e| {
Error::Context(
DataFusionError::Context(
"Error converting to stringset".to_string(),
Box::new(Error::External(Box::new(e))),
Box::new(DataFusionError::External(Box::new(e))),
)
}),
}
@ -646,9 +654,9 @@ impl IOxSessionContext {
T: Send + 'static,
{
exec.spawn(fut).await.unwrap_or_else(|e| {
Err(Error::Context(
Err(DataFusionError::Context(
"Join Error".to_string(),
Box::new(Error::External(Box::new(e))),
Box::new(DataFusionError::External(Box::new(e))),
))
})
}
@ -688,6 +696,32 @@ impl IOxSessionContext {
}
}
/// Returns an error if this plan contains any unsupported statements:
///
/// * DDL (`CREATE TABLE`) - creates state in a context that is dropped at the end of the request
/// * Statements (`SET VARIABLE`) - can cause denial of service by using more memory or cput
/// * DML (`INSERT`, `COPY`) - can write local files so is a security risk on servers
fn verify_plan(plan: &LogicalPlan) -> Result<()> {
plan.visit(&mut BadPlanVisitor {})?;
Ok(())
}
struct BadPlanVisitor {}
impl TreeNodeVisitor for BadPlanVisitor {
type N = LogicalPlan;
fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
match node {
LogicalPlan::Ddl(ddl) => plan_err!("DDL not supported: {}", ddl.name()),
LogicalPlan::Dml(dml) => plan_err!("DML not supported: {}", dml.op),
LogicalPlan::Copy(_) => plan_err!("DML not supported: COPY"),
LogicalPlan::Statement(stmt) => plan_err!("Statement not supported: {}", stmt.name()),
_ => Ok(VisitRecursion::Continue),
}
}
}
/// Extension trait to pull IOx spans out of DataFusion contexts.
pub trait SessionContextIOxExt {
/// Get child span of the current context.

View File

@ -201,8 +201,8 @@ impl ExecutionPlan for DeduplicateExec {
vec![true]
}
fn benefits_from_input_partitioning(&self) -> bool {
false
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {

View File

@ -736,7 +736,7 @@ mod test {
let expr = binary_expr("boolean_field".as_expr(), Operator::BitwiseXor, lit(true));
assert_eq!(
rewrite(expr),
"CAST(CAST(boolean_field AS Int8) # CAST(Boolean(true) AS Int8) AS Boolean)"
"CAST(CAST(boolean_field AS Int8) BIT_XOR CAST(Boolean(true) AS Int8) AS Boolean)"
);
// Unsupported operations

View File

@ -3571,28 +3571,24 @@ mod tests {
tag_key_meta_names: TagKeyMetaNames::Text as i32,
};
let expected_message = "No function matches the given name and argument types 'AVG(Utf8)'";
let tonic_status = storage_client
.read_window_aggregate(request)
.await
.unwrap_err();
assert!(tonic_status
.message()
.contains("Avg does not support inputs of type Utf8"));
assert_contains!(tonic_status.message(), expected_message);
assert_eq!(tonic::Code::InvalidArgument, tonic_status.code());
let mut rpc_status = GrpcStatus::decode(tonic_status.details()).unwrap();
assert!(rpc_status
.message
.contains("Avg does not support inputs of type Utf8"));
assert_contains!(rpc_status.message, expected_message);
assert_eq!(tonic::Code::InvalidArgument as i32, rpc_status.code);
assert_eq!(1, rpc_status.details.len());
let detail = rpc_status.details.pop().unwrap();
let influx_err = InfluxDbError::decode(detail.value).unwrap();
assert_eq!("invalid", influx_err.code);
assert!(influx_err
.message
.contains("Avg does not support inputs of type Utf8"));
assert_contains!(influx_err.message, expected_message);
assert_eq!("iox/influxrpc", influx_err.op);
assert_eq!(None, influx_err.error);
}

View File

@ -28,9 +28,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "7a5354fe5908b8ac7db163d6c484dbf1d85a142e" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "7a5354fe5908b8ac7db163d6c484dbf1d85a142e", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "7a5354fe5908b8ac7db163d6c484dbf1d85a142e", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f2c0100a5a10bf3ea166a1a590d94b8c9b6cf673", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.4" }