chore: Upgrade datafusion (#6467)

* chore: Update datafusion

* fix: Update for new apis

* chore: Update expected plan

* fix: Update for new config construction

* chore: update clippy

* fix: Fix error codes

* fix: update another test

* chore: Run cargo hakari tasks

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-01-03 10:29:11 -05:00 committed by GitHub
parent 90d53ae383
commit dbe52f1ca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 68 additions and 67 deletions

16
Cargo.lock generated
View File

@ -1295,7 +1295,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1340,7 +1340,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"arrow",
"chrono",
@ -1352,7 +1352,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1364,7 +1364,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"arrow",
"async-trait",
@ -1379,7 +1379,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1408,7 +1408,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"arrow",
"chrono",
@ -1425,7 +1425,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"arrow",
"datafusion-common",
@ -1436,7 +1436,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
dependencies = [
"arrow-schema",
"datafusion-common",

View File

@ -115,8 +115,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "29.0.0" }
arrow-flight = { version = "29.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" }
hashbrown = { version = "0.13.1" }
parquet = { version = "29.0.0" }

View File

@ -564,23 +564,16 @@ async fn unsupported_sql_returns_error() {
},
Step::QueryExpectingError {
sql: "create database my_new_database".into(),
// Should be InvalidArgument after
// https://github.com/apache/arrow-datafusion/issues/3873
expected_error_code: tonic::Code::Internal,
expected_message: "Error while planning query: Internal error: Unsupported \
logical plan: CreateCatalog. This was likely caused by a bug in DataFusion's \
code and we would welcome that you file an bug report in our issue tracker"
expected_error_code: tonic::Code::InvalidArgument,
expected_message: "Error while planning query: This feature is not implemented: \
CreateCatalog"
.into(),
},
Step::QueryExpectingError {
sql: "create schema foo".into(),
// Should be InvalidArgument after
// https://github.com/apache/arrow-datafusion/issues/3873
expected_error_code: tonic::Code::Internal,
expected_message: "Error while planning query: Internal error: Unsupported \
logical plan: CreateCatalogSchema. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our \
issue tracker"
expected_error_code: tonic::Code::InvalidArgument,
expected_message: "Error while planning query: This feature is not implemented: \
CreateCatalogSchema"
.into(),
},
],

View File

@ -209,7 +209,15 @@ impl IOxSessionConfig {
/// Create an ExecutionContext suitable for executing DataFusion plans
pub fn build(self) -> IOxSessionContext {
let state = SessionState::with_config_rt(self.session_config, self.runtime)
let maybe_span = self.span_ctx.child_span("Query Execution");
let recorder = SpanRecorder::new(maybe_span);
// attach span to DataFusion session
let session_config = self
.session_config
.with_extension(Arc::new(recorder.span().cloned()));
let state = SessionState::with_config_rt(session_config, self.runtime)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
let state = register_selector_aggregates(state);
@ -222,9 +230,7 @@ impl IOxSessionConfig {
inner.register_catalog(DEFAULT_CATALOG, default_catalog);
}
let maybe_span = self.span_ctx.child_span("Query Execution");
IOxSessionContext::new(inner, self.exec, SpanRecorder::new(maybe_span))
IOxSessionContext::new(inner, self.exec, recorder)
}
}
@ -284,15 +290,6 @@ impl IOxSessionContext {
exec: DedicatedExecutor,
recorder: SpanRecorder,
) -> Self {
// attach span to DataFusion session
{
let mut state = inner.state.write();
state.config = state
.config
.clone()
.with_extension(Arc::new(recorder.span().cloned()));
}
Self {
inner,
exec,
@ -310,11 +307,21 @@ impl IOxSessionContext {
pub async fn prepare_sql(&self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
let ctx = self.child_ctx("prepare_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql here as it also interprets DDL
#[allow(deprecated)]
let logical_plan = ctx.inner.create_logical_plan(sql)?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
// Handle unsupported SQL
// Make nicer erorrs for unsupported SQL
// (By default datafusion returns Internal Error)
match &logical_plan {
LogicalPlan::CreateCatalog(_) => {
return Err(Error::NotImplemented("CreateCatalog".to_string()));
}
LogicalPlan::CreateCatalogSchema(_) => {
return Err(Error::NotImplemented("CreateCatalogSchema".to_string()));
}
LogicalPlan::CreateMemoryTable(_) => {
return Err(Error::NotImplemented("CreateMemoryTable".to_string()));
}

View File

@ -197,7 +197,6 @@ pub fn chunks_to_physical_nodes(
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: context.session_config().config_options(),
output_ordering,
};
let meta_size_hint = None;

View File

@ -121,8 +121,6 @@ impl ParquetExecInput {
projection: None,
limit: None,
table_partition_cols: vec![],
// TODO avoid this `copied_config` when config_options are directly available on context
config_options: session_ctx.copied_config().config_options(),
// Parquet files ARE actually sorted but we don't care here since we just construct a `collect` plan.
output_ordering: None,
};

View File

@ -7,13 +7,16 @@ use datafusion::{
listing::PartitionedFile,
object_store::ObjectStoreUrl,
},
execution::context::TaskContext,
execution::{
context::{SessionState, TaskContext},
runtime_env::RuntimeEnv,
},
physical_plan::{
execute_stream,
file_format::{FileScanConfig, ParquetExec},
SendableRecordBatchStream, Statistics,
},
prelude::{SessionConfig, SessionContext},
prelude::SessionContext,
};
use datafusion_util::config::iox_session_config;
use futures::StreamExt;
@ -163,7 +166,7 @@ pub struct ParquetFileReader {
schema: ArrowSchemaRef,
/// DataFusion configuration, such as the target batchsize, etc
session_config: SessionConfig,
session_ctx: SessionContext,
}
impl ParquetFileReader {
@ -173,25 +176,28 @@ impl ParquetFileReader {
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
) -> Result<Self, Error> {
let runtime = Arc::new(RuntimeEnv::default());
let session_config = iox_session_config();
let session_state = SessionState::with_config_rt(session_config, runtime);
// Keep metadata so we can find the measurement name
let format =
ParquetFormat::new(session_config.config_options()).with_skip_metadata(Some(false));
let format = ParquetFormat::new().with_skip_metadata(Some(false));
// Use datafusion parquet reader to read the metadata from the
// file.
let schema = format
.infer_schema(&object_store, &[object_meta.clone()])
.infer_schema(&session_state, &object_store, &[object_meta.clone()])
.await
.context(InferringSchemaSnafu)?;
let session_ctx = SessionContext::with_state(session_state);
Ok(Self {
object_store,
object_store_url,
object_meta,
schema,
session_config,
session_ctx,
})
}
@ -216,17 +222,15 @@ impl ParquetFileReader {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
config_options: self.session_config.config_options(),
};
// set up enough datafusion context to do the real read session
let predicate = None;
let metadata_size_hint = None;
let exec = ParquetExec::new(base_config, predicate, metadata_size_hint);
let session_ctx = SessionContext::with_config(self.session_config.clone());
let object_store = Arc::clone(&self.object_store);
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
let task_ctx = Arc::new(TaskContext::from(&self.session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);

View File

@ -320,18 +320,18 @@
+-------+--------+--------------------------------+---------+
-- SQL: EXPLAIN SELECT * from restaurant where influx_regex_match(town, 'foo|bar|baz') and influx_regex_not_match(town, 'one|two');
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) |
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two |
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) |
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two |
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) AS restaurant.town !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -352,7 +352,7 @@ async fn sql_create_external_table() {
#[tokio::test]
async fn sql_create_schema() {
let expected_error = "Unsupported logical plan: CreateCatalogSchema";
let expected_error = "This feature is not implemented: CreateCatalogSchema";
// Datafusion supports CREATE SCHEMA, but IOx should not (as that would be a security hole)
run_sql_error_test_case(
scenarios::delete::NoDeleteOneChunk {},

View File

@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["std"] }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "891a800ebb170fed018e53846eb569f3e0638857", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }