diff --git a/Cargo.lock b/Cargo.lock index a0a661f7db..6fed5019e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstyle" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80c697cc33851b02ab0c26b2e8a211684fbe627ff1cc506131f35026dd7686dd" + [[package]] name = "anyhow" version = "1.0.69" @@ -360,10 +366,11 @@ dependencies = [ [[package]] name = "assert_cmd" -version = "2.0.8" +version = "2.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9834fcc22e0874394a010230586367d4a3e9f11b560f469262678547e1d2575e" +checksum = "c0dcbed38184f9219183fcf38beb4cdbf5df7163a6d7cd227c6ac89b7966d6fe" dependencies = [ + "anstyle", "bstr", "doc-comment", "predicates", @@ -4320,10 +4327,11 @@ dependencies = [ [[package]] name = "predicates" -version = "2.1.5" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +checksum = "1ba7d6ead3e3966038f68caa9fc1f860185d95a793180bbcfe0d0da47b3961ed" dependencies = [ + "anstyle", "difflib", "float-cmp", "itertools", @@ -4334,9 +4342,9 @@ dependencies = [ [[package]] name = "predicates-core" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72f883590242d3c6fc5bf50299011695fa6590c2c70eac95ee1bdb9a733ad1a2" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" [[package]] name = "predicates-tree" diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 2344cba42b..6b2ccc7af2 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -14,7 +14,7 @@ use crate::error::*; /// Represents a prepared statement "handle". IOx passes all state /// required to run the prepared statement back and forth to the -/// client so any querier instance can run it +/// client, so any querier instance can run it #[derive(Debug, Clone, PartialEq)] pub struct PreparedStatementHandle { /// The raw SQL query text @@ -57,12 +57,17 @@ impl From for Bytes { } } -/// Decoded / validated FlightSQL command messages +/// Decoded / validated FlightSQL command messages +/// +/// Handles encoding/decoding prost::Any messages back +/// and forth to native Rust types #[derive(Debug, Clone, PartialEq)] pub enum FlightSQLCommand { CommandStatementQuery(String), /// Run a prepared statement CommandPreparedStatementQuery(PreparedStatementHandle), + /// Get a list of the available catalogs + CommandGetCatalogs(), /// Create a prepared statement ActionCreatePreparedStatementRequest(String), /// Close a prepared statement @@ -74,6 +79,7 @@ impl Display for FlightSQLCommand { match self { Self::CommandStatementQuery(q) => write!(f, "CommandStatementQuery{q}"), Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"), + Self::CommandGetCatalogs() => write!(f, "CommandGetCatalogs"), Self::ActionCreatePreparedStatementRequest(q) => { write!(f, "ActionCreatePreparedStatementRequest{q}") } @@ -100,6 +106,10 @@ impl FlightSQLCommand { let handle = PreparedStatementHandle::try_decode(prepared_statement_handle)?; Ok(Self::CommandPreparedStatementQuery(handle)) + } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + let CommandGetCatalogs {} = decoded_cmd; + + Ok(Self::CommandGetCatalogs()) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { let ActionCreatePreparedStatementRequest { query } = decoded_cmd; @@ -131,6 +141,7 @@ impl FlightSQLCommand { prepared_statement_handle, }) } + FlightSQLCommand::CommandGetCatalogs() => Any::pack(&CommandGetCatalogs {}), FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => { Any::pack(&ActionCreatePreparedStatementRequest { query }) } diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 6779cc3931..b7ca27ba93 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -1,13 +1,13 @@ //! FlightSQL handling use std::sync::Arc; -use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions}; +use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::{ sql::{ActionCreatePreparedStatementResult, Any}, IpcMessage, SchemaAsIpc, }; use bytes::Bytes; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan}; use iox_query::{exec::IOxSessionContext, QueryNamespace}; use observability_deps::tracing::debug; use prost::Message; @@ -35,12 +35,17 @@ impl FlightSQLPlanner { match cmd { FlightSQLCommand::CommandStatementQuery(query) => { - Self::get_schema_for_query(&query, ctx).await + get_schema_for_query(&query, ctx).await } FlightSQLCommand::CommandPreparedStatementQuery(handle) => { - Self::get_schema_for_query(handle.query(), ctx).await + get_schema_for_query(handle.query(), ctx).await } - _ => ProtocolSnafu { + FlightSQLCommand::CommandGetCatalogs() => { + let plan = plan_get_catalogs(ctx).await?; + get_schema_for_plan(plan) + } + FlightSQLCommand::ActionCreatePreparedStatementRequest(_) + | FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), method: "GetFlightInfo", } @@ -48,24 +53,6 @@ impl FlightSQLPlanner { } } - /// Return the schema for the specified query - /// - /// returns: IPC encoded (schema_bytes) for this query - async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result { - // gather real schema, but only - let logical_plan = ctx.plan_sql(query).await?; - let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref()); - let options = IpcWriteOptions::default(); - - // encode the schema into the correct form - let message: Result = - SchemaAsIpc::new(&schema, &options).try_into(); - - let IpcMessage(schema) = message?; - - Ok(schema) - } - /// Returns a plan that computes results requested in msg pub async fn do_get( namespace_name: impl Into, @@ -86,7 +73,14 @@ impl FlightSQLPlanner { debug!(%query, "Planning FlightSQL prepared query"); Ok(ctx.prepare_sql(query).await?) } - _ => ProtocolSnafu { + FlightSQLCommand::CommandGetCatalogs() => { + debug!("Planning GetCatalogs query"); + let plan = plan_get_catalogs(ctx).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } + + FlightSQLCommand::ActionClosePreparedStatementRequest(_) + | FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), method: "DoGet", } @@ -114,7 +108,7 @@ impl FlightSQLPlanner { // see https://github.com/apache/arrow-datafusion/pull/4701 let parameter_schema = vec![]; - let dataset_schema = Self::get_schema_for_query(&query, ctx).await?; + let dataset_schema = get_schema_for_query(&query, ctx).await?; let handle = PreparedStatementHandle::new(query); let result = ActionCreatePreparedStatementResult { @@ -141,3 +135,41 @@ impl FlightSQLPlanner { } } } + +/// Return the schema for the specified query +/// +/// returns: IPC encoded (schema_bytes) for this query +async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result { + get_schema_for_plan(ctx.plan_sql(query).await?) +} + +/// Return the schema for the specified logical plan +/// +/// returns: IPC encoded (schema_bytes) for this query +fn get_schema_for_plan(logical_plan: LogicalPlan) -> Result { + // gather real schema, but only + let schema = Schema::from(logical_plan.schema().as_ref()); + encode_schema(&schema) +} + +/// Encodes the schema IPC encoded (schema_bytes) +fn encode_schema(schema: &Schema) -> Result { + let options = IpcWriteOptions::default(); + + // encode the schema into the correct form + let message: Result = SchemaAsIpc::new(schema, &options).try_into(); + + let IpcMessage(schema) = message?; + + Ok(schema) +} + +/// Return a `LogicalPlan` for GetCatalogs +/// +/// In the future this could be made more efficient by building the +/// response directly from the IOx catalog rather than running an +/// entire DataFusion plan. +async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result { + let query = "SELECT DISTINCT table_catalog AS catalog_name FROM information_schema.tables ORDER BY table_catalog"; + Ok(ctx.plan_sql(query).await?) +} diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 70bbcf028a..a4da9dcc81 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -83,10 +83,10 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order arrow_util = { path = "../arrow_util" } -assert_cmd = "2.0.8" +assert_cmd = "2.0.9" async-trait = "0.1" predicate = { path = "../predicate" } -predicates = "2.1.0" +predicates = "3.0.1" serde = "1.0.156" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index a0411ccb64..17e57379d8 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -1,5 +1,7 @@ use std::path::PathBuf; +use arrow::record_batch::RecordBatch; +use arrow_flight::decode::FlightRecordBatchStream; use arrow_util::assert_batches_sorted_eq; use assert_cmd::Command; use datafusion::common::assert_contains; @@ -37,22 +39,10 @@ async fn flightsql_adhoc_query() { "+------+------+--------------------------------+-----+", ]; - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); + let mut client = flightsql_client(state.cluster()); - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); - - let batches: Vec<_> = client - .query(sql) - .await - .expect("ran SQL query") - .try_collect() - .await - .expect("got batches"); + let stream = client.query(sql).await.unwrap(); + let batches = collect_stream(stream).await; assert_batches_sorted_eq!(&expected, &batches); } @@ -84,14 +74,7 @@ async fn flightsql_adhoc_query_error() { async move { let sql = String::from("select * from incorrect_table"); - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); - - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); + let mut client = flightsql_client(state.cluster()); let err = client.query(sql).await.unwrap_err(); @@ -138,24 +121,54 @@ async fn flightsql_prepared_query() { "+------+------+--------------------------------+-----+", ]; - let connection = state.cluster().querier().querier_grpc_connection(); - let (channel, _headers) = connection.into_grpc_connection().into_parts(); - - let mut client = FlightSqlClient::new(channel); - - // Add namespace to client headers until it is fully supported by FlightSQL - let namespace = state.cluster().namespace(); - client.add_header("iox-namespace-name", namespace).unwrap(); + let mut client = flightsql_client(state.cluster()); let handle = client.prepare(sql).await.unwrap(); + let stream = client.execute(handle).await.unwrap(); - let batches: Vec<_> = client - .execute(handle) - .await - .expect("ran SQL query") - .try_collect() - .await - .expect("got batches"); + let batches = collect_stream(stream).await; + + assert_batches_sorted_eq!(&expected, &batches); + } + .boxed() + })), + ], + ) + .run() + .await +} + +#[tokio::test] +async fn flightsql_get_catalogs() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let expected = vec![ + "+--------------+", + "| catalog_name |", + "+--------------+", + "| public |", + "+--------------+", + ]; + + let mut client = flightsql_client(state.cluster()); + + let stream = client.get_catalogs().await.unwrap(); + let batches = collect_stream(stream).await; assert_batches_sorted_eq!(&expected, &batches); } @@ -243,6 +256,22 @@ async fn flightsql_jdbc() { .success() .stdout(predicate::str::contains("Running Prepared SQL Query")) .stdout(predicate::str::contains("A, B")); + + // CommandGetCatalogs output + let expected_catalogs = "**************\n\ + Catalogs:\n\ + **************\n\ + TABLE_CAT\n\ + ------------\n\ + public"; + + // Validate metadata: jdbc_client metadata + Command::from_std(std::process::Command::new(&path)) + .arg(&jdbc_url) + .arg("metadata") + .assert() + .success() + .stdout(predicate::str::contains(expected_catalogs)); } .boxed() })), @@ -251,3 +280,21 @@ async fn flightsql_jdbc() { .run() .await } + +/// Return a [`FlightSqlClient`] configured for use +fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient { + let connection = cluster.querier().querier_grpc_connection(); + let (channel, _headers) = connection.into_grpc_connection().into_parts(); + + let mut client = FlightSqlClient::new(channel); + + // Add namespace to client headers until it is fully supported by FlightSQL + let namespace = cluster.namespace(); + client.add_header("iox-namespace-name", namespace).unwrap(); + + client +} + +async fn collect_stream(stream: FlightRecordBatchStream) -> Vec { + stream.try_collect().await.expect("collecting batches") +} diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index 6cca662e58..4b1e16f61a 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -19,7 +19,7 @@ public class Main { System.err.println("# Run specified prepared query without parameters"); System.err.println("jdbc_client prepared_query "); System.err.println("# Run metadata tests"); - System.err.println("jdbc_client props"); + System.err.println("jdbc_client metadata"); System.exit(1); } @@ -63,9 +63,9 @@ public class Main { } break; - case "props": + case "metadata": { - run_props(url); + run_metadata(url); } break; @@ -115,17 +115,27 @@ public class Main { print_result_set(rs); } - static void run_props(String url) throws SQLException { + static void run_metadata(String url) throws SQLException { Connection conn = connect(url); System.out.println(conn.getCatalog()); DatabaseMetaData md = conn.getMetaData(); - System.out.println("isReadOnly: " + md.isReadOnly()); - System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); - System.out.println("getDriverVersion: " + md.getDriverVersion()); - System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion()); - System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion()); - System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion()); - System.out.println("getDriverName: " + md.getDriverName()); + // Note yet implemented + // (see https://github.com/influxdata/influxdb_iox/issues/7210 ) + + System.out.println("**************"); + System.out.println("Catalogs:"); + System.out.println("**************"); + ResultSet rs = md.getCatalogs(); + print_result_set(rs); + + //System.out.println("isReadOnly: " + md.isReadOnly()); + //System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); + //System.out.println("getDriverVersion: " + md.getDriverVersion()); + //System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion()); + //System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion()); + //System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion()); + //System.out.println("getDriverName: " + md.getDriverName()); + } // Print out the ResultSet in a whitespace delimited form diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 8136357135..116c365dd0 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,7 +29,7 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, + CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, }; @@ -124,6 +124,22 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the catalogs on this server using a [`CommandGetCatalogs`] message. + /// + /// This implementation does not support alternate endpoints + /// + /// [`CommandGetCatalogs`]: https://github.com/apache/arrow/blob/3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e/format/FlightSql.proto#L1125-L1140 + pub async fn get_catalogs(&mut self) -> Result { + let msg = CommandGetCatalogs {}; + self.do_get_with_cmd(msg.as_any()).await + } + + /// Implements the canonical interaction for most FlightSQL messages: + /// + /// 1. Call `GetFlightInfo` with the provided message, and get a + /// [`FlightInfo`] and embedded ticket. + /// + /// 2. Call `DoGet` with the provided ticket. async fn do_get_with_cmd( &mut self, cmd: arrow_flight::sql::Any, diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 7793113465..1e8bbdbf82 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -329,6 +329,15 @@ impl IOxSessionContext { pub async fn prepare_sql(&self, sql: &str) -> Result> { let logical_plan = self.plan_sql(sql).await?; + let ctx = self.child_ctx("prepare_sql"); + ctx.create_physical_plan(&logical_plan).await + } + + /// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution + pub async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + ) -> Result> { // Make nicer erorrs for unsupported SQL // (By default datafusion returns Internal Error) match &logical_plan { @@ -353,15 +362,9 @@ impl IOxSessionContext { _ => (), } - let ctx = self.child_ctx("prepare_sql"); - ctx.create_physical_plan(&logical_plan).await - } - - /// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution - pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result> { let mut ctx = self.child_ctx("create_physical_plan"); - debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan"); - let physical_plan = ctx.inner.state().create_physical_plan(plan).await?; + debug!(text=%logical_plan.display_indent_schema(), "create_physical_plan: initial plan"); + let physical_plan = ctx.inner.state().create_physical_plan(logical_plan).await?; ctx.recorder.event("physical plan"); debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run"); diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index 898cd7ab8c..f5f615bda2 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -1,9 +1,13 @@ use std::sync::Arc; -use datafusion::physical_plan::{ - empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan, - ExecutionPlan, ExecutionPlanVisitor, +use datafusion::{ + error::DataFusionError, + physical_plan::{ + empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan, + ExecutionPlan, ExecutionPlanVisitor, + }, }; +use observability_deps::tracing::debug; use schema::Schema; use crate::{ @@ -22,7 +26,13 @@ use crate::{ /// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>)> { let mut visitor = ExtractChunksVisitor::default(); - visit_execution_plan(plan, &mut visitor).ok()?; + if let Err(e) = visit_execution_plan(plan, &mut visitor) { + debug!( + %e, + "cannot extract chunks", + ); + return None; + } visitor.schema.map(|schema| (schema, visitor.chunks)) } @@ -33,16 +43,22 @@ struct ExtractChunksVisitor { } impl ExtractChunksVisitor { - fn add_chunk(&mut self, chunk: Arc) -> Result<(), ()> { + fn add_chunk(&mut self, chunk: Arc) { self.chunks.push(chunk); - Ok(()) } - fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), ()> { - let schema = Schema::try_from(exec.schema()).map_err(|_| ())?; + fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> { + let schema = Schema::try_from(exec.schema()).map_err(|e| { + DataFusionError::Context( + "Schema recovery".to_owned(), + Box::new(DataFusionError::External(Box::new(e))), + ) + })?; if let Some(existing) = &self.schema { if existing != &schema { - return Err(()); + return Err(DataFusionError::External( + String::from("Different schema").into(), + )); } } else { self.schema = Some(schema); @@ -52,19 +68,27 @@ impl ExtractChunksVisitor { } impl ExecutionPlanVisitor for ExtractChunksVisitor { - type Error = (); + type Error = DataFusionError; fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { let plan_any = plan.as_any(); if let Some(record_batches_exec) = plan_any.downcast_ref::() { - self.add_schema_from_exec(record_batches_exec)?; + self.add_schema_from_exec(record_batches_exec) + .map_err(|e| { + DataFusionError::Context( + "add schema from RecordBatchesExec".to_owned(), + Box::new(e), + ) + })?; for chunk in record_batches_exec.chunks() { - self.add_chunk(Arc::clone(chunk))?; + self.add_chunk(Arc::clone(chunk)); } } else if let Some(parquet_exec) = plan_any.downcast_ref::() { - self.add_schema_from_exec(parquet_exec)?; + self.add_schema_from_exec(parquet_exec).map_err(|e| { + DataFusionError::Context("add schema from ParquetExec".to_owned(), Box::new(e)) + })?; for group in &parquet_exec.base_config().file_groups { for file in group { @@ -72,22 +96,32 @@ impl ExecutionPlanVisitor for ExtractChunksVisitor { .extensions .as_ref() .and_then(|any| any.downcast_ref::()) - .ok_or(())?; - self.add_chunk(Arc::clone(&ext.0))?; + .ok_or_else(|| { + DataFusionError::External( + String::from("PartitionedFileExt not found").into(), + ) + })?; + self.add_chunk(Arc::clone(&ext.0)); } } } else if let Some(empty_exec) = plan_any.downcast_ref::() { // should not produce dummy data if empty_exec.produce_one_row() { - return Err(()); + return Err(DataFusionError::External( + String::from("EmptyExec produces row").into(), + )); } - self.add_schema_from_exec(empty_exec)?; + self.add_schema_from_exec(empty_exec).map_err(|e| { + DataFusionError::Context("add schema from EmptyExec".to_owned(), Box::new(e)) + })?; } else if plan_any.downcast_ref::().is_some() { // continue visiting } else { // unsupported node - return Err(()); + return Err(DataFusionError::External( + String::from("Unsupported node").into(), + )); } Ok(true) @@ -181,7 +215,7 @@ mod tests { #[test] fn test_stop_at_other_node_types() { let chunk1 = chunk(1); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -225,7 +259,13 @@ mod tests { #[track_caller] fn assert_roundtrip(schema: Schema, chunks: Vec>) { - let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2); + let plan = chunks_to_physical_nodes( + &schema.as_arrow(), + None, + chunks.clone(), + Predicate::default(), + 2, + ); let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found"); assert_eq!(schema, schema2); assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2)); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index ac898e4c0f..78cd14602a 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -36,7 +36,7 @@ impl PhysicalOptimizerRule for CombineChunks { plan.transform_up(&|plan| { if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) { return Ok(Some(chunks_to_physical_nodes( - &iox_schema, + &iox_schema.as_arrow(), None, chunks, Predicate::new(), @@ -72,7 +72,7 @@ mod tests { let chunk3 = TestChunk::new("table").with_id(3); let chunk4 = TestChunk::new("table").with_id(4).with_dummy_parquet_file(); let chunk5 = TestChunk::new("table").with_id(5).with_dummy_parquet_file(); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = Arc::new(UnionExec::new(vec![ chunks_to_physical_nodes( &schema, diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index 8efcf43e5b..f9790cb22c 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -64,15 +64,16 @@ impl PhysicalOptimizerRule for DedupNullColumns { } let sort_key = sort_key_builder.build(); + let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &schema, + &arrow_schema, (!sort_key.is_empty()).then_some(&sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&sort_key, schema.as_arrow().as_ref()); + let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index 57a5d2d840..650f9dc141 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -124,15 +124,16 @@ impl PhysicalOptimizerRule for DedupSortOrder { } let quorum_sort_key = quorum_sort_key_builder.build(); + let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &schema, + &arrow_schema, (!quorum_sort_key.is_empty()).then_some(&quorum_sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, schema.as_arrow().as_ref()); + let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 0d21fc5113..066f97febb 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -76,13 +76,14 @@ impl PhysicalOptimizerRule for PartitionSplit { let mut chunks_by_partition = chunks_by_partition.into_iter().collect::>(); chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id); + let arrow_schema = schema.as_arrow(); let out = UnionExec::new( chunks_by_partition .into_iter() .map(|(_p_id, chunks)| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index edda8ac6b9..1aae525dcf 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -35,8 +35,9 @@ impl PhysicalOptimizerRule for RemoveDedup { }; if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) { + let arrow_schema = schema.as_arrow(); return Ok(Some(chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/test_util.rs b/iox_query/src/physical_optimizer/dedup/test_util.rs index 19bb3e508a..25fff9131a 100644 --- a/iox_query/src/physical_optimizer/dedup/test_util.rs +++ b/iox_query/src/physical_optimizer/dedup/test_util.rs @@ -16,7 +16,7 @@ pub fn dedup_plan(schema: Schema, chunks: Vec) -> Arc>>(); - let plan = chunks_to_physical_nodes(&schema, None, chunks, Predicate::new(), 2); + let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2); let sort_key = schema::sort::SortKey::from_columns(schema.primary_key()); let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow()); diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index bed6b825cd..a54715f34e 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -63,13 +63,14 @@ impl PhysicalOptimizerRule for TimeSplit { return Ok(None); } + let arrow_schema = schema.as_arrow(); let out = UnionExec::new( groups .into_iter() .map(|chunks| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &schema, + &arrow_schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 86da9bff2b..afa5a1e3c0 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -19,7 +19,7 @@ use datafusion::{ filter::FilterExec, projection::ProjectionExec, rewrite::TreeNodeRewritable, - sorts::sort::SortExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, union::UnionExec, ExecutionPlan, PhysicalExpr, }, @@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec}; pub struct ProjectionPushdown; impl PhysicalOptimizerRule for ProjectionPushdown { + #[allow(clippy::only_used_in_recursion)] fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result> { plan.transform_down(&|plan| { let plan_any = plan.as_any(); @@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; + return Ok(Some(plan)); + } else if let Some(child_sort) = child_any.downcast_ref::() + { + let sort_required_cols = child_sort + .expr() + .iter() + .map(|expr| collect_columns(&expr.expr)) + .collect::>(); + let sort_required_cols = sort_required_cols + .iter() + .flat_map(|cols| cols.iter()) + .map(|col| col.name()) + .collect::>(); + + let plan = wrap_user_into_projections( + &sort_required_cols, + &column_names, + Arc::clone(child_sort.input()), + |plan| { + Ok(Arc::new(SortPreservingMergeExec::new( + reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?, + plan, + ))) + }, + )?; + + return Ok(Some(plan)); + } else if let Some(child_proj) = child_any.downcast_ref::() { + let expr = column_indices + .iter() + .map(|idx| child_proj.expr()[*idx].clone()) + .collect(); + let plan = Arc::new(ProjectionExec::try_new( + expr, + Arc::clone(child_proj.input()), + )?); + + // need to call `optimize` directly on the plan, because otherwise we would continue with the child + // and miss the optimization of that particular new ProjectionExec + let plan = self.optimize(plan, config)?; + return Ok(Some(plan)); } else if let Some(child_dedup) = child_any.downcast_ref::() { let dedup_required_cols = child_dedup.sort_columns(); @@ -793,6 +835,135 @@ mod tests { ); } + // since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec` + #[test] + fn test_sortpreservingmerge_projection_split() { + let schema = schema(); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &schema), String::from("tag1"))], + Arc::new(SortPreservingMergeExec::new( + vec![PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: SortOptions { + descending: true, + ..Default::default() + }, + }], + Arc::new(TestExec::new(schema)), + )), + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " Test" + output: + Ok: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " Test" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_impure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + ( + Arc::new(Literal::new(ScalarValue::from("foo"))), + String::from("tag1"), + ), + ( + Arc::new(Literal::new(ScalarValue::from("bar"))), + String::from("tag2"), + ), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[foo as tag1, bar as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " ProjectionExec: expr=[foo as tag1]" + - " EmptyExec: produce_one_row=false" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_pure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + (expr_col("tag1", &plan.schema()), String::from("tag1")), + (expr_col("tag2", &plan.schema()), String::from("tag2")), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + let test = OptimizationTest::new(plan, opt); + insta::assert_yaml_snapshot!( + test, + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " EmptyExec: produce_one_row=false" + "### + ); + let empty_exec = test + .output_plan() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]); + assert_eq!(empty_exec.schema().as_ref(), &expected_schema); + } + // since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec` #[test] fn test_dedup_projection_split1() { diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 0683dea5fd..e75e619dab 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -1089,7 +1089,7 @@ impl Deduplicater { // Create the bottom node RecordBatchesExec for this chunk let mut input = chunks_to_physical_nodes( - &input_schema, + &input_schema.as_arrow(), output_sort_key, vec![Arc::clone(&chunk)], predicate, @@ -1267,7 +1267,7 @@ impl Deduplicater { debug!("Build one scan RecordBatchesExec for all non duplicated chunks even if empty"); plans.push(chunks_to_physical_nodes( - output_schema, + &output_schema.as_arrow(), output_sort_key, chunks.into_no_duplicates(deduplication), predicate, @@ -1452,7 +1452,7 @@ mod test { // IOx scan operator let input = chunks_to_physical_nodes( - chunk.schema(), + &chunk.schema().as_arrow(), None, vec![Arc::clone(&chunk)], Predicate::default(), @@ -1540,7 +1540,7 @@ mod test { // IOx scan operator let input = chunks_to_physical_nodes( - chunk.schema(), + &chunk.schema().as_arrow(), None, vec![Arc::clone(&chunk)], Predicate::default(), diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 1a7bf3a2e2..d9e73bb038 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -4,6 +4,7 @@ use crate::{ provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk, QueryChunkData, }; +use arrow::datatypes::SchemaRef; use datafusion::{ datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}, physical_expr::execution_props::ExecutionProps, @@ -135,14 +136,14 @@ fn combine_sort_key( /// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The /// caller is responsible for wrapping the output node into appropriate filter nodes. pub fn chunks_to_physical_nodes( - iox_schema: &Schema, + schema: &SchemaRef, output_sort_key: Option<&SortKey>, chunks: Vec>, predicate: Predicate, target_partitions: usize, ) -> Arc { if chunks.is_empty() { - return Arc::new(EmptyExec::new(false, iox_schema.as_arrow())); + return Arc::new(EmptyExec::new(false, Arc::clone(schema))); } let mut record_batch_chunks: Vec> = vec![]; @@ -177,7 +178,7 @@ pub fn chunks_to_physical_nodes( if !record_batch_chunks.is_empty() { output_nodes.push(Arc::new(RecordBatchesExec::new( record_batch_chunks, - iox_schema.as_arrow(), + Arc::clone(schema), ))); } let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect(); @@ -202,14 +203,12 @@ pub fn chunks_to_physical_nodes( ); // Tell datafusion about the sort key, if any - let file_schema = iox_schema.as_arrow(); - let output_ordering = - sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, &file_schema)); + let output_ordering = sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, schema)); let props = ExecutionProps::new(); let filter_expr = predicate.filter_expr() .and_then(|filter_expr| { - match create_physical_expr_from_schema(&props, &filter_expr, &file_schema) { + match create_physical_expr_from_schema(&props, &filter_expr, schema) { Ok(f) => Some(f), Err(e) => { warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down"); @@ -220,7 +219,7 @@ pub fn chunks_to_physical_nodes( let base_config = FileScanConfig { object_store_url, - file_schema, + file_schema: Arc::clone(schema), file_groups, statistics: Statistics::default(), projection: None, @@ -361,7 +360,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_empty() { - let schema = TestChunk::new("table").schema().clone(); + let schema = TestChunk::new("table").schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @@ -375,7 +374,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_recordbatch() { let chunk = TestChunk::new("table"); - let schema = chunk.schema().clone(); + let schema = chunk.schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( @@ -391,7 +390,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_parquet_one_file() { let chunk = TestChunk::new("table").with_dummy_parquet_file(); - let schema = chunk.schema().clone(); + let schema = chunk.schema().as_arrow(); let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( @@ -409,7 +408,7 @@ mod tests { let chunk1 = TestChunk::new("table").with_id(0).with_dummy_parquet_file(); let chunk2 = TestChunk::new("table").with_id(1).with_dummy_parquet_file(); let chunk3 = TestChunk::new("table").with_id(2).with_dummy_parquet_file(); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -435,7 +434,7 @@ mod tests { let chunk2 = TestChunk::new("table") .with_id(1) .with_dummy_parquet_file_and_store("iox2://"); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, @@ -458,7 +457,7 @@ mod tests { fn test_chunks_to_physical_nodes_mixed() { let chunk1 = TestChunk::new("table").with_dummy_parquet_file(); let chunk2 = TestChunk::new("table"); - let schema = chunk1.schema().clone(); + let schema = chunk1.schema().as_arrow(); let plan = chunks_to_physical_nodes( &schema, None, diff --git a/test_helpers_end_to_end/Cargo.toml b/test_helpers_end_to_end/Cargo.toml index b1200f8cf9..160ba99f44 100644 --- a/test_helpers_end_to_end/Cargo.toml +++ b/test_helpers_end_to_end/Cargo.toml @@ -9,7 +9,7 @@ license.workspace = true arrow = { workspace = true, features = ["prettyprint"] } arrow-flight = { workspace = true } arrow_util = { path = "../arrow_util" } -assert_cmd = "2.0.8" +assert_cmd = "2.0.9" bytes = "1.4" data_types = { path = "../data_types" } dml = { path = "../dml" } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 41b653fc9f..5475f89b69 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -63,7 +63,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] } parquet = { version = "34", features = ["async", "experimental"] } petgraph = { version = "0.6" } phf_shared = { version = "0.11" } -predicates = { version = "2" } +predicates = { version = "3" } prost = { version = "0.11" } prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] }