diff --git a/Cargo.lock b/Cargo.lock index 264ad1c71c..952747b79f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1295,7 +1295,7 @@ dependencies = [ [[package]] name = "datafusion" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "ahash 0.8.2", "arrow", @@ -1340,7 +1340,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "arrow", "chrono", @@ -1352,7 +1352,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "ahash 0.8.2", "arrow", @@ -1364,7 +1364,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "arrow", "async-trait", @@ -1379,7 +1379,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "ahash 0.8.2", "arrow", @@ -1408,7 +1408,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "arrow", "chrono", @@ -1425,7 +1425,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "arrow", "datafusion-common", @@ -1436,7 +1436,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=891a800ebb170fed018e53846eb569f3e0638857#891a800ebb170fed018e53846eb569f3e0638857" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" dependencies = [ "arrow-schema", "datafusion-common", diff --git a/Cargo.toml b/Cargo.toml index 84fd6c9749..188ae1740a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,8 +115,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "29.0.0" } arrow-flight = { version = "29.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="891a800ebb170fed018e53846eb569f3e0638857" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" } hashbrown = { version = "0.13.1" } parquet = { version = "29.0.0" } diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 493ea149a2..d50eede75d 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -564,23 +564,16 @@ async fn unsupported_sql_returns_error() { }, Step::QueryExpectingError { sql: "create database my_new_database".into(), - // Should be InvalidArgument after - // https://github.com/apache/arrow-datafusion/issues/3873 - expected_error_code: tonic::Code::Internal, - expected_message: "Error while planning query: Internal error: Unsupported \ - logical plan: CreateCatalog. This was likely caused by a bug in DataFusion's \ - code and we would welcome that you file an bug report in our issue tracker" + expected_error_code: tonic::Code::InvalidArgument, + expected_message: "Error while planning query: This feature is not implemented: \ + CreateCatalog" .into(), }, Step::QueryExpectingError { sql: "create schema foo".into(), - // Should be InvalidArgument after - // https://github.com/apache/arrow-datafusion/issues/3873 - expected_error_code: tonic::Code::Internal, - expected_message: "Error while planning query: Internal error: Unsupported \ - logical plan: CreateCatalogSchema. This was likely caused by a bug in \ - DataFusion's code and we would welcome that you file an bug report in our \ - issue tracker" + expected_error_code: tonic::Code::InvalidArgument, + expected_message: "Error while planning query: This feature is not implemented: \ + CreateCatalogSchema" .into(), }, ], diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index bb1e0016bc..3ffc0cddc8 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -209,7 +209,15 @@ impl IOxSessionConfig { /// Create an ExecutionContext suitable for executing DataFusion plans pub fn build(self) -> IOxSessionContext { - let state = SessionState::with_config_rt(self.session_config, self.runtime) + let maybe_span = self.span_ctx.child_span("Query Execution"); + let recorder = SpanRecorder::new(maybe_span); + + // attach span to DataFusion session + let session_config = self + .session_config + .with_extension(Arc::new(recorder.span().cloned())); + + let state = SessionState::with_config_rt(session_config, self.runtime) .with_query_planner(Arc::new(IOxQueryPlanner {})); let state = register_selector_aggregates(state); @@ -222,9 +230,7 @@ impl IOxSessionConfig { inner.register_catalog(DEFAULT_CATALOG, default_catalog); } - let maybe_span = self.span_ctx.child_span("Query Execution"); - - IOxSessionContext::new(inner, self.exec, SpanRecorder::new(maybe_span)) + IOxSessionContext::new(inner, self.exec, recorder) } } @@ -284,15 +290,6 @@ impl IOxSessionContext { exec: DedicatedExecutor, recorder: SpanRecorder, ) -> Self { - // attach span to DataFusion session - { - let mut state = inner.state.write(); - state.config = state - .config - .clone() - .with_extension(Arc::new(recorder.span().cloned())); - } - Self { inner, exec, @@ -310,11 +307,21 @@ impl IOxSessionContext { pub async fn prepare_sql(&self, sql: &str) -> Result> { let ctx = self.child_ctx("prepare_sql"); debug!(text=%sql, "planning SQL query"); + + // NOTE can not use ctx.inner.sql here as it also interprets DDL + #[allow(deprecated)] let logical_plan = ctx.inner.create_logical_plan(sql)?; debug!(plan=%logical_plan.display_graphviz(), "logical plan"); - // Handle unsupported SQL + // Make nicer erorrs for unsupported SQL + // (By default datafusion returns Internal Error) match &logical_plan { + LogicalPlan::CreateCatalog(_) => { + return Err(Error::NotImplemented("CreateCatalog".to_string())); + } + LogicalPlan::CreateCatalogSchema(_) => { + return Err(Error::NotImplemented("CreateCatalogSchema".to_string())); + } LogicalPlan::CreateMemoryTable(_) => { return Err(Error::NotImplemented("CreateMemoryTable".to_string())); } diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index bd2fbcc575..36b89e6feb 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -197,7 +197,6 @@ pub fn chunks_to_physical_nodes( projection: None, limit: None, table_partition_cols: vec![], - config_options: context.session_config().config_options(), output_ordering, }; let meta_size_hint = None; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index a41d8fda4e..39f1fe050c 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -121,8 +121,6 @@ impl ParquetExecInput { projection: None, limit: None, table_partition_cols: vec![], - // TODO avoid this `copied_config` when config_options are directly available on context - config_options: session_ctx.copied_config().config_options(), // Parquet files ARE actually sorted but we don't care here since we just construct a `collect` plan. output_ordering: None, }; diff --git a/parquet_to_line_protocol/src/lib.rs b/parquet_to_line_protocol/src/lib.rs index 19f0779d8f..78afaa1aac 100644 --- a/parquet_to_line_protocol/src/lib.rs +++ b/parquet_to_line_protocol/src/lib.rs @@ -7,13 +7,16 @@ use datafusion::{ listing::PartitionedFile, object_store::ObjectStoreUrl, }, - execution::context::TaskContext, + execution::{ + context::{SessionState, TaskContext}, + runtime_env::RuntimeEnv, + }, physical_plan::{ execute_stream, file_format::{FileScanConfig, ParquetExec}, SendableRecordBatchStream, Statistics, }, - prelude::{SessionConfig, SessionContext}, + prelude::SessionContext, }; use datafusion_util::config::iox_session_config; use futures::StreamExt; @@ -163,7 +166,7 @@ pub struct ParquetFileReader { schema: ArrowSchemaRef, /// DataFusion configuration, such as the target batchsize, etc - session_config: SessionConfig, + session_ctx: SessionContext, } impl ParquetFileReader { @@ -173,25 +176,28 @@ impl ParquetFileReader { object_store_url: ObjectStoreUrl, object_meta: ObjectMeta, ) -> Result { + let runtime = Arc::new(RuntimeEnv::default()); let session_config = iox_session_config(); + let session_state = SessionState::with_config_rt(session_config, runtime); // Keep metadata so we can find the measurement name - let format = - ParquetFormat::new(session_config.config_options()).with_skip_metadata(Some(false)); + let format = ParquetFormat::new().with_skip_metadata(Some(false)); // Use datafusion parquet reader to read the metadata from the // file. let schema = format - .infer_schema(&object_store, &[object_meta.clone()]) + .infer_schema(&session_state, &object_store, &[object_meta.clone()]) .await .context(InferringSchemaSnafu)?; + let session_ctx = SessionContext::with_state(session_state); + Ok(Self { object_store, object_store_url, object_meta, schema, - session_config, + session_ctx, }) } @@ -216,17 +222,15 @@ impl ParquetFileReader { limit: None, table_partition_cols: vec![], output_ordering: None, - config_options: self.session_config.config_options(), }; // set up enough datafusion context to do the real read session let predicate = None; let metadata_size_hint = None; let exec = ParquetExec::new(base_config, predicate, metadata_size_hint); - let session_ctx = SessionContext::with_config(self.session_config.clone()); let object_store = Arc::clone(&self.object_store); - let task_ctx = Arc::new(TaskContext::from(&session_ctx)); + let task_ctx = Arc::new(TaskContext::from(&self.session_ctx)); task_ctx .runtime_env() .register_object_store("iox", "iox", object_store); diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 14a6cfe0e1..8a472e36f1 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -320,18 +320,18 @@ +-------+--------+--------------------------------+---------+ -- SQL: EXPLAIN SELECT * from restaurant where influx_regex_match(town, 'foo|bar|baz') and influx_regex_not_match(town, 'one|two'); -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) | -| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] | -| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two | -| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) | +| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two | +| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) AS restaurant.town !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index b5ab9ab0b8..b5dfca87f3 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -352,7 +352,7 @@ async fn sql_create_external_table() { #[tokio::test] async fn sql_create_schema() { - let expected_error = "Unsupported logical plan: CreateCatalogSchema"; + let expected_error = "This feature is not implemented: CreateCatalogSchema"; // Datafusion supports CREATE SCHEMA, but IOx should not (as that would be a security hole) run_sql_error_test_case( scenarios::delete::NoDeleteOneChunk {}, diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index cdb12fadf7..e20f675900 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] } crossbeam-utils = { version = "0.8", features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "891a800ebb170fed018e53846eb569f3e0638857", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] } either = { version = "1", features = ["use_std"] } fixedbitset = { version = "0.4", features = ["std"] }