Merge branch 'main' into dom/wal-ref-metrics
commit
17e61126a3
|
@ -80,6 +80,12 @@ version = "0.1.6"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "80c697cc33851b02ab0c26b2e8a211684fbe627ff1cc506131f35026dd7686dd"
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.69"
|
||||
|
@ -360,10 +366,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "assert_cmd"
|
||||
version = "2.0.8"
|
||||
version = "2.0.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9834fcc22e0874394a010230586367d4a3e9f11b560f469262678547e1d2575e"
|
||||
checksum = "c0dcbed38184f9219183fcf38beb4cdbf5df7163a6d7cd227c6ac89b7966d6fe"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"bstr",
|
||||
"doc-comment",
|
||||
"predicates",
|
||||
|
@ -4320,10 +4327,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "predicates"
|
||||
version = "2.1.5"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd"
|
||||
checksum = "1ba7d6ead3e3966038f68caa9fc1f860185d95a793180bbcfe0d0da47b3961ed"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"difflib",
|
||||
"float-cmp",
|
||||
"itertools",
|
||||
|
@ -4334,9 +4342,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "predicates-core"
|
||||
version = "1.0.5"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72f883590242d3c6fc5bf50299011695fa6590c2c70eac95ee1bdb9a733ad1a2"
|
||||
checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
|
||||
|
||||
[[package]]
|
||||
name = "predicates-tree"
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::fmt::Display;
|
|||
|
||||
use arrow_flight::sql::{
|
||||
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
|
||||
CommandPreparedStatementQuery, CommandStatementQuery,
|
||||
CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use prost::Message;
|
||||
|
@ -14,7 +14,7 @@ use crate::error::*;
|
|||
|
||||
/// Represents a prepared statement "handle". IOx passes all state
|
||||
/// required to run the prepared statement back and forth to the
|
||||
/// client so any querier instance can run it
|
||||
/// client, so any querier instance can run it
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct PreparedStatementHandle {
|
||||
/// The raw SQL query text
|
||||
|
@ -57,12 +57,17 @@ impl From<PreparedStatementHandle> for Bytes {
|
|||
}
|
||||
}
|
||||
|
||||
/// Decoded / validated FlightSQL command messages
|
||||
/// Decoded / validated FlightSQL command messages
|
||||
///
|
||||
/// Handles encoding/decoding prost::Any messages back
|
||||
/// and forth to native Rust types
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum FlightSQLCommand {
|
||||
CommandStatementQuery(String),
|
||||
/// Run a prepared statement
|
||||
CommandPreparedStatementQuery(PreparedStatementHandle),
|
||||
/// Get a list of the available catalogs
|
||||
CommandGetCatalogs(),
|
||||
/// Create a prepared statement
|
||||
ActionCreatePreparedStatementRequest(String),
|
||||
/// Close a prepared statement
|
||||
|
@ -74,6 +79,7 @@ impl Display for FlightSQLCommand {
|
|||
match self {
|
||||
Self::CommandStatementQuery(q) => write!(f, "CommandStatementQuery{q}"),
|
||||
Self::CommandPreparedStatementQuery(h) => write!(f, "CommandPreparedStatementQuery{h}"),
|
||||
Self::CommandGetCatalogs() => write!(f, "CommandGetCatalogs"),
|
||||
Self::ActionCreatePreparedStatementRequest(q) => {
|
||||
write!(f, "ActionCreatePreparedStatementRequest{q}")
|
||||
}
|
||||
|
@ -100,6 +106,10 @@ impl FlightSQLCommand {
|
|||
|
||||
let handle = PreparedStatementHandle::try_decode(prepared_statement_handle)?;
|
||||
Ok(Self::CommandPreparedStatementQuery(handle))
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetCatalogs>(&msg)? {
|
||||
let CommandGetCatalogs {} = decoded_cmd;
|
||||
|
||||
Ok(Self::CommandGetCatalogs())
|
||||
} else if let Some(decoded_cmd) = Any::unpack::<ActionCreatePreparedStatementRequest>(&msg)?
|
||||
{
|
||||
let ActionCreatePreparedStatementRequest { query } = decoded_cmd;
|
||||
|
@ -131,6 +141,7 @@ impl FlightSQLCommand {
|
|||
prepared_statement_handle,
|
||||
})
|
||||
}
|
||||
FlightSQLCommand::CommandGetCatalogs() => Any::pack(&CommandGetCatalogs {}),
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(query) => {
|
||||
Any::pack(&ActionCreatePreparedStatementRequest { query })
|
||||
}
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
//! FlightSQL handling
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions};
|
||||
use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions};
|
||||
use arrow_flight::{
|
||||
sql::{ActionCreatePreparedStatementResult, Any},
|
||||
IpcMessage, SchemaAsIpc,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan};
|
||||
use iox_query::{exec::IOxSessionContext, QueryNamespace};
|
||||
use observability_deps::tracing::debug;
|
||||
use prost::Message;
|
||||
|
@ -35,12 +35,17 @@ impl FlightSQLPlanner {
|
|||
|
||||
match cmd {
|
||||
FlightSQLCommand::CommandStatementQuery(query) => {
|
||||
Self::get_schema_for_query(&query, ctx).await
|
||||
get_schema_for_query(&query, ctx).await
|
||||
}
|
||||
FlightSQLCommand::CommandPreparedStatementQuery(handle) => {
|
||||
Self::get_schema_for_query(handle.query(), ctx).await
|
||||
get_schema_for_query(handle.query(), ctx).await
|
||||
}
|
||||
_ => ProtocolSnafu {
|
||||
FlightSQLCommand::CommandGetCatalogs() => {
|
||||
let plan = plan_get_catalogs(ctx).await?;
|
||||
get_schema_for_plan(plan)
|
||||
}
|
||||
FlightSQLCommand::ActionCreatePreparedStatementRequest(_)
|
||||
| FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu {
|
||||
cmd: format!("{cmd:?}"),
|
||||
method: "GetFlightInfo",
|
||||
}
|
||||
|
@ -48,24 +53,6 @@ impl FlightSQLPlanner {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return the schema for the specified query
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
|
||||
// gather real schema, but only
|
||||
let logical_plan = ctx.plan_sql(query).await?;
|
||||
let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref());
|
||||
let options = IpcWriteOptions::default();
|
||||
|
||||
// encode the schema into the correct form
|
||||
let message: Result<IpcMessage, ArrowError> =
|
||||
SchemaAsIpc::new(&schema, &options).try_into();
|
||||
|
||||
let IpcMessage(schema) = message?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
|
||||
/// Returns a plan that computes results requested in msg
|
||||
pub async fn do_get(
|
||||
namespace_name: impl Into<String>,
|
||||
|
@ -86,7 +73,14 @@ impl FlightSQLPlanner {
|
|||
debug!(%query, "Planning FlightSQL prepared query");
|
||||
Ok(ctx.prepare_sql(query).await?)
|
||||
}
|
||||
_ => ProtocolSnafu {
|
||||
FlightSQLCommand::CommandGetCatalogs() => {
|
||||
debug!("Planning GetCatalogs query");
|
||||
let plan = plan_get_catalogs(ctx).await?;
|
||||
Ok(ctx.create_physical_plan(&plan).await?)
|
||||
}
|
||||
|
||||
FlightSQLCommand::ActionClosePreparedStatementRequest(_)
|
||||
| FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu {
|
||||
cmd: format!("{cmd:?}"),
|
||||
method: "DoGet",
|
||||
}
|
||||
|
@ -114,7 +108,7 @@ impl FlightSQLPlanner {
|
|||
// see https://github.com/apache/arrow-datafusion/pull/4701
|
||||
let parameter_schema = vec![];
|
||||
|
||||
let dataset_schema = Self::get_schema_for_query(&query, ctx).await?;
|
||||
let dataset_schema = get_schema_for_query(&query, ctx).await?;
|
||||
let handle = PreparedStatementHandle::new(query);
|
||||
|
||||
let result = ActionCreatePreparedStatementResult {
|
||||
|
@ -141,3 +135,41 @@ impl FlightSQLPlanner {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the schema for the specified query
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
|
||||
get_schema_for_plan(ctx.plan_sql(query).await?)
|
||||
}
|
||||
|
||||
/// Return the schema for the specified logical plan
|
||||
///
|
||||
/// returns: IPC encoded (schema_bytes) for this query
|
||||
fn get_schema_for_plan(logical_plan: LogicalPlan) -> Result<Bytes> {
|
||||
// gather real schema, but only
|
||||
let schema = Schema::from(logical_plan.schema().as_ref());
|
||||
encode_schema(&schema)
|
||||
}
|
||||
|
||||
/// Encodes the schema IPC encoded (schema_bytes)
|
||||
fn encode_schema(schema: &Schema) -> Result<Bytes> {
|
||||
let options = IpcWriteOptions::default();
|
||||
|
||||
// encode the schema into the correct form
|
||||
let message: Result<IpcMessage, ArrowError> = SchemaAsIpc::new(schema, &options).try_into();
|
||||
|
||||
let IpcMessage(schema) = message?;
|
||||
|
||||
Ok(schema)
|
||||
}
|
||||
|
||||
/// Return a `LogicalPlan` for GetCatalogs
|
||||
///
|
||||
/// In the future this could be made more efficient by building the
|
||||
/// response directly from the IOx catalog rather than running an
|
||||
/// entire DataFusion plan.
|
||||
async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
|
||||
let query = "SELECT DISTINCT table_catalog AS catalog_name FROM information_schema.tables ORDER BY table_catalog";
|
||||
Ok(ctx.plan_sql(query).await?)
|
||||
}
|
||||
|
|
|
@ -83,10 +83,10 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
|||
[dev-dependencies]
|
||||
# In alphabetical order
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.8"
|
||||
assert_cmd = "2.0.9"
|
||||
async-trait = "0.1"
|
||||
predicate = { path = "../predicate" }
|
||||
predicates = "2.1.0"
|
||||
predicates = "3.0.1"
|
||||
serde = "1.0.156"
|
||||
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
|
||||
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_flight::decode::FlightRecordBatchStream;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_cmd::Command;
|
||||
use datafusion::common::assert_contains;
|
||||
|
@ -37,22 +39,10 @@ async fn flightsql_adhoc_query() {
|
|||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
|
||||
let batches: Vec<_> = client
|
||||
.query(sql)
|
||||
.await
|
||||
.expect("ran SQL query")
|
||||
.try_collect()
|
||||
.await
|
||||
.expect("got batches");
|
||||
let stream = client.query(sql).await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
@ -84,14 +74,7 @@ async fn flightsql_adhoc_query_error() {
|
|||
async move {
|
||||
let sql = String::from("select * from incorrect_table");
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let err = client.query(sql).await.unwrap_err();
|
||||
|
||||
|
@ -138,24 +121,54 @@ async fn flightsql_prepared_query() {
|
|||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
|
||||
let connection = state.cluster().querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = state.cluster().namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let handle = client.prepare(sql).await.unwrap();
|
||||
let stream = client.execute(handle).await.unwrap();
|
||||
|
||||
let batches: Vec<_> = client
|
||||
.execute(handle)
|
||||
.await
|
||||
.expect("ran SQL query")
|
||||
.try_collect()
|
||||
.await
|
||||
.expect("got batches");
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flightsql_get_catalogs() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let expected = vec![
|
||||
"+--------------+",
|
||||
"| catalog_name |",
|
||||
"+--------------+",
|
||||
"| public |",
|
||||
"+--------------+",
|
||||
];
|
||||
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let stream = client.get_catalogs().await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
}
|
||||
|
@ -243,6 +256,22 @@ async fn flightsql_jdbc() {
|
|||
.success()
|
||||
.stdout(predicate::str::contains("Running Prepared SQL Query"))
|
||||
.stdout(predicate::str::contains("A, B"));
|
||||
|
||||
// CommandGetCatalogs output
|
||||
let expected_catalogs = "**************\n\
|
||||
Catalogs:\n\
|
||||
**************\n\
|
||||
TABLE_CAT\n\
|
||||
------------\n\
|
||||
public";
|
||||
|
||||
// Validate metadata: jdbc_client <url> metadata
|
||||
Command::from_std(std::process::Command::new(&path))
|
||||
.arg(&jdbc_url)
|
||||
.arg("metadata")
|
||||
.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains(expected_catalogs));
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
@ -251,3 +280,21 @@ async fn flightsql_jdbc() {
|
|||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Return a [`FlightSqlClient`] configured for use
|
||||
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
|
||||
let connection = cluster.querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
let mut client = FlightSqlClient::new(channel);
|
||||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = cluster.namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
|
||||
client
|
||||
}
|
||||
|
||||
async fn collect_stream(stream: FlightRecordBatchStream) -> Vec<RecordBatch> {
|
||||
stream.try_collect().await.expect("collecting batches")
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ public class Main {
|
|||
System.err.println("# Run specified prepared query without parameters");
|
||||
System.err.println("jdbc_client <url> prepared_query <sql>");
|
||||
System.err.println("# Run metadata tests");
|
||||
System.err.println("jdbc_client <url> props");
|
||||
System.err.println("jdbc_client <url> metadata");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
|
@ -63,9 +63,9 @@ public class Main {
|
|||
}
|
||||
break;
|
||||
|
||||
case "props":
|
||||
case "metadata":
|
||||
{
|
||||
run_props(url);
|
||||
run_metadata(url);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -115,17 +115,27 @@ public class Main {
|
|||
print_result_set(rs);
|
||||
}
|
||||
|
||||
static void run_props(String url) throws SQLException {
|
||||
static void run_metadata(String url) throws SQLException {
|
||||
Connection conn = connect(url);
|
||||
System.out.println(conn.getCatalog());
|
||||
DatabaseMetaData md = conn.getMetaData();
|
||||
System.out.println("isReadOnly: " + md.isReadOnly());
|
||||
System.out.println("getSearchStringEscape: " + md.getSearchStringEscape());
|
||||
System.out.println("getDriverVersion: " + md.getDriverVersion());
|
||||
System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion());
|
||||
System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion());
|
||||
System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion());
|
||||
System.out.println("getDriverName: " + md.getDriverName());
|
||||
// Note yet implemented
|
||||
// (see https://github.com/influxdata/influxdb_iox/issues/7210 )
|
||||
|
||||
System.out.println("**************");
|
||||
System.out.println("Catalogs:");
|
||||
System.out.println("**************");
|
||||
ResultSet rs = md.getCatalogs();
|
||||
print_result_set(rs);
|
||||
|
||||
//System.out.println("isReadOnly: " + md.isReadOnly());
|
||||
//System.out.println("getSearchStringEscape: " + md.getSearchStringEscape());
|
||||
//System.out.println("getDriverVersion: " + md.getDriverVersion());
|
||||
//System.out.println("getDatabaseProductVersion: " + md.getDatabaseProductVersion());
|
||||
//System.out.println("getJDBCMajorVersion: " + md.getJDBCMajorVersion());
|
||||
//System.out.println("getJDBCMinorVersion: " + md.getJDBCMinorVersion());
|
||||
//System.out.println("getDriverName: " + md.getDriverName());
|
||||
|
||||
}
|
||||
|
||||
// Print out the ResultSet in a whitespace delimited form
|
||||
|
|
|
@ -29,7 +29,7 @@ use arrow_flight::{
|
|||
error::{FlightError, Result},
|
||||
sql::{
|
||||
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
|
||||
CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
|
||||
CommandGetCatalogs, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
|
||||
},
|
||||
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
|
||||
};
|
||||
|
@ -124,6 +124,22 @@ impl FlightSqlClient {
|
|||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
||||
/// List the catalogs on this server using a [`CommandGetCatalogs`] message.
|
||||
///
|
||||
/// This implementation does not support alternate endpoints
|
||||
///
|
||||
/// [`CommandGetCatalogs`]: https://github.com/apache/arrow/blob/3a6fc1f9eedd41df2d8ffbcbdfbdab911ff6d82e/format/FlightSql.proto#L1125-L1140
|
||||
pub async fn get_catalogs(&mut self) -> Result<FlightRecordBatchStream> {
|
||||
let msg = CommandGetCatalogs {};
|
||||
self.do_get_with_cmd(msg.as_any()).await
|
||||
}
|
||||
|
||||
/// Implements the canonical interaction for most FlightSQL messages:
|
||||
///
|
||||
/// 1. Call `GetFlightInfo` with the provided message, and get a
|
||||
/// [`FlightInfo`] and embedded ticket.
|
||||
///
|
||||
/// 2. Call `DoGet` with the provided ticket.
|
||||
async fn do_get_with_cmd(
|
||||
&mut self,
|
||||
cmd: arrow_flight::sql::Any,
|
||||
|
|
|
@ -329,6 +329,15 @@ impl IOxSessionContext {
|
|||
pub async fn prepare_sql(&self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let logical_plan = self.plan_sql(sql).await?;
|
||||
|
||||
let ctx = self.child_ctx("prepare_sql");
|
||||
ctx.create_physical_plan(&logical_plan).await
|
||||
}
|
||||
|
||||
/// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution
|
||||
pub async fn create_physical_plan(
|
||||
&self,
|
||||
logical_plan: &LogicalPlan,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// Make nicer erorrs for unsupported SQL
|
||||
// (By default datafusion returns Internal Error)
|
||||
match &logical_plan {
|
||||
|
@ -353,15 +362,9 @@ impl IOxSessionContext {
|
|||
_ => (),
|
||||
}
|
||||
|
||||
let ctx = self.child_ctx("prepare_sql");
|
||||
ctx.create_physical_plan(&logical_plan).await
|
||||
}
|
||||
|
||||
/// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution
|
||||
pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let mut ctx = self.child_ctx("create_physical_plan");
|
||||
debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan");
|
||||
let physical_plan = ctx.inner.state().create_physical_plan(plan).await?;
|
||||
debug!(text=%logical_plan.display_indent_schema(), "create_physical_plan: initial plan");
|
||||
let physical_plan = ctx.inner.state().create_physical_plan(logical_plan).await?;
|
||||
|
||||
ctx.recorder.event("physical plan");
|
||||
debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run");
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use datafusion::physical_plan::{
|
||||
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
|
||||
ExecutionPlan, ExecutionPlanVisitor,
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
physical_plan::{
|
||||
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
|
||||
ExecutionPlan, ExecutionPlanVisitor,
|
||||
},
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::Schema;
|
||||
|
||||
use crate::{
|
||||
|
@ -22,7 +26,13 @@ use crate::{
|
|||
/// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes
|
||||
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec<Arc<dyn QueryChunk>>)> {
|
||||
let mut visitor = ExtractChunksVisitor::default();
|
||||
visit_execution_plan(plan, &mut visitor).ok()?;
|
||||
if let Err(e) = visit_execution_plan(plan, &mut visitor) {
|
||||
debug!(
|
||||
%e,
|
||||
"cannot extract chunks",
|
||||
);
|
||||
return None;
|
||||
}
|
||||
visitor.schema.map(|schema| (schema, visitor.chunks))
|
||||
}
|
||||
|
||||
|
@ -33,16 +43,22 @@ struct ExtractChunksVisitor {
|
|||
}
|
||||
|
||||
impl ExtractChunksVisitor {
|
||||
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) -> Result<(), ()> {
|
||||
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) {
|
||||
self.chunks.push(chunk);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), ()> {
|
||||
let schema = Schema::try_from(exec.schema()).map_err(|_| ())?;
|
||||
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> {
|
||||
let schema = Schema::try_from(exec.schema()).map_err(|e| {
|
||||
DataFusionError::Context(
|
||||
"Schema recovery".to_owned(),
|
||||
Box::new(DataFusionError::External(Box::new(e))),
|
||||
)
|
||||
})?;
|
||||
if let Some(existing) = &self.schema {
|
||||
if existing != &schema {
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("Different schema").into(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
self.schema = Some(schema);
|
||||
|
@ -52,19 +68,27 @@ impl ExtractChunksVisitor {
|
|||
}
|
||||
|
||||
impl ExecutionPlanVisitor for ExtractChunksVisitor {
|
||||
type Error = ();
|
||||
type Error = DataFusionError;
|
||||
|
||||
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
|
||||
let plan_any = plan.as_any();
|
||||
|
||||
if let Some(record_batches_exec) = plan_any.downcast_ref::<RecordBatchesExec>() {
|
||||
self.add_schema_from_exec(record_batches_exec)?;
|
||||
self.add_schema_from_exec(record_batches_exec)
|
||||
.map_err(|e| {
|
||||
DataFusionError::Context(
|
||||
"add schema from RecordBatchesExec".to_owned(),
|
||||
Box::new(e),
|
||||
)
|
||||
})?;
|
||||
|
||||
for chunk in record_batches_exec.chunks() {
|
||||
self.add_chunk(Arc::clone(chunk))?;
|
||||
self.add_chunk(Arc::clone(chunk));
|
||||
}
|
||||
} else if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
|
||||
self.add_schema_from_exec(parquet_exec)?;
|
||||
self.add_schema_from_exec(parquet_exec).map_err(|e| {
|
||||
DataFusionError::Context("add schema from ParquetExec".to_owned(), Box::new(e))
|
||||
})?;
|
||||
|
||||
for group in &parquet_exec.base_config().file_groups {
|
||||
for file in group {
|
||||
|
@ -72,22 +96,32 @@ impl ExecutionPlanVisitor for ExtractChunksVisitor {
|
|||
.extensions
|
||||
.as_ref()
|
||||
.and_then(|any| any.downcast_ref::<PartitionedFileExt>())
|
||||
.ok_or(())?;
|
||||
self.add_chunk(Arc::clone(&ext.0))?;
|
||||
.ok_or_else(|| {
|
||||
DataFusionError::External(
|
||||
String::from("PartitionedFileExt not found").into(),
|
||||
)
|
||||
})?;
|
||||
self.add_chunk(Arc::clone(&ext.0));
|
||||
}
|
||||
}
|
||||
} else if let Some(empty_exec) = plan_any.downcast_ref::<EmptyExec>() {
|
||||
// should not produce dummy data
|
||||
if empty_exec.produce_one_row() {
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("EmptyExec produces row").into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.add_schema_from_exec(empty_exec)?;
|
||||
self.add_schema_from_exec(empty_exec).map_err(|e| {
|
||||
DataFusionError::Context("add schema from EmptyExec".to_owned(), Box::new(e))
|
||||
})?;
|
||||
} else if plan_any.downcast_ref::<UnionExec>().is_some() {
|
||||
// continue visiting
|
||||
} else {
|
||||
// unsupported node
|
||||
return Err(());
|
||||
return Err(DataFusionError::External(
|
||||
String::from("Unsupported node").into(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
|
@ -181,7 +215,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_stop_at_other_node_types() {
|
||||
let chunk1 = chunk(1);
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -225,7 +259,13 @@ mod tests {
|
|||
|
||||
#[track_caller]
|
||||
fn assert_roundtrip(schema: Schema, chunks: Vec<Arc<dyn QueryChunk>>) {
|
||||
let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2);
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema.as_arrow(),
|
||||
None,
|
||||
chunks.clone(),
|
||||
Predicate::default(),
|
||||
2,
|
||||
);
|
||||
let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found");
|
||||
assert_eq!(schema, schema2);
|
||||
assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2));
|
||||
|
|
|
@ -36,7 +36,7 @@ impl PhysicalOptimizerRule for CombineChunks {
|
|||
plan.transform_up(&|plan| {
|
||||
if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) {
|
||||
return Ok(Some(chunks_to_physical_nodes(
|
||||
&iox_schema,
|
||||
&iox_schema.as_arrow(),
|
||||
None,
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
|
@ -72,7 +72,7 @@ mod tests {
|
|||
let chunk3 = TestChunk::new("table").with_id(3);
|
||||
let chunk4 = TestChunk::new("table").with_id(4).with_dummy_parquet_file();
|
||||
let chunk5 = TestChunk::new("table").with_id(5).with_dummy_parquet_file();
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = Arc::new(UnionExec::new(vec![
|
||||
chunks_to_physical_nodes(
|
||||
&schema,
|
||||
|
|
|
@ -64,15 +64,16 @@ impl PhysicalOptimizerRule for DedupNullColumns {
|
|||
}
|
||||
|
||||
let sort_key = sort_key_builder.build();
|
||||
let arrow_schema = schema.as_arrow();
|
||||
let child = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
&arrow_schema,
|
||||
(!sort_key.is_empty()).then_some(&sort_key),
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
config.execution.target_partitions,
|
||||
);
|
||||
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, schema.as_arrow().as_ref());
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema);
|
||||
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
|
||||
}
|
||||
|
||||
|
|
|
@ -124,15 +124,16 @@ impl PhysicalOptimizerRule for DedupSortOrder {
|
|||
}
|
||||
|
||||
let quorum_sort_key = quorum_sort_key_builder.build();
|
||||
let arrow_schema = schema.as_arrow();
|
||||
let child = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
&arrow_schema,
|
||||
(!quorum_sort_key.is_empty()).then_some(&quorum_sort_key),
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
config.execution.target_partitions,
|
||||
);
|
||||
|
||||
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, schema.as_arrow().as_ref());
|
||||
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema);
|
||||
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
|
||||
}
|
||||
|
||||
|
|
|
@ -76,13 +76,14 @@ impl PhysicalOptimizerRule for PartitionSplit {
|
|||
let mut chunks_by_partition = chunks_by_partition.into_iter().collect::<Vec<_>>();
|
||||
chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id);
|
||||
|
||||
let arrow_schema = schema.as_arrow();
|
||||
let out = UnionExec::new(
|
||||
chunks_by_partition
|
||||
.into_iter()
|
||||
.map(|(_p_id, chunks)| {
|
||||
Arc::new(DeduplicateExec::new(
|
||||
chunks_to_physical_nodes(
|
||||
&schema,
|
||||
&arrow_schema,
|
||||
None,
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
|
|
|
@ -35,8 +35,9 @@ impl PhysicalOptimizerRule for RemoveDedup {
|
|||
};
|
||||
|
||||
if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) {
|
||||
let arrow_schema = schema.as_arrow();
|
||||
return Ok(Some(chunks_to_physical_nodes(
|
||||
&schema,
|
||||
&arrow_schema,
|
||||
None,
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
|
|
|
@ -16,7 +16,7 @@ pub fn dedup_plan(schema: Schema, chunks: Vec<TestChunk>) -> Arc<dyn ExecutionPl
|
|||
.into_iter()
|
||||
.map(|c| Arc::new(c) as _)
|
||||
.collect::<Vec<Arc<dyn QueryChunk>>>();
|
||||
let plan = chunks_to_physical_nodes(&schema, None, chunks, Predicate::new(), 2);
|
||||
let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2);
|
||||
|
||||
let sort_key = schema::sort::SortKey::from_columns(schema.primary_key());
|
||||
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow());
|
||||
|
|
|
@ -63,13 +63,14 @@ impl PhysicalOptimizerRule for TimeSplit {
|
|||
return Ok(None);
|
||||
}
|
||||
|
||||
let arrow_schema = schema.as_arrow();
|
||||
let out = UnionExec::new(
|
||||
groups
|
||||
.into_iter()
|
||||
.map(|chunks| {
|
||||
Arc::new(DeduplicateExec::new(
|
||||
chunks_to_physical_nodes(
|
||||
&schema,
|
||||
&arrow_schema,
|
||||
None,
|
||||
chunks,
|
||||
Predicate::new(),
|
||||
|
|
|
@ -19,7 +19,7 @@ use datafusion::{
|
|||
filter::FilterExec,
|
||||
projection::ProjectionExec,
|
||||
rewrite::TreeNodeRewritable,
|
||||
sorts::sort::SortExec,
|
||||
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
|
||||
union::UnionExec,
|
||||
ExecutionPlan, PhysicalExpr,
|
||||
},
|
||||
|
@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec};
|
|||
pub struct ProjectionPushdown;
|
||||
|
||||
impl PhysicalOptimizerRule for ProjectionPushdown {
|
||||
#[allow(clippy::only_used_in_recursion)]
|
||||
fn optimize(
|
||||
&self,
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
_config: &ConfigOptions,
|
||||
config: &ConfigOptions,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
plan.transform_down(&|plan| {
|
||||
let plan_any = plan.as_any();
|
||||
|
@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
|
|||
},
|
||||
)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_sort) = child_any.downcast_ref::<SortPreservingMergeExec>()
|
||||
{
|
||||
let sort_required_cols = child_sort
|
||||
.expr()
|
||||
.iter()
|
||||
.map(|expr| collect_columns(&expr.expr))
|
||||
.collect::<Vec<_>>();
|
||||
let sort_required_cols = sort_required_cols
|
||||
.iter()
|
||||
.flat_map(|cols| cols.iter())
|
||||
.map(|col| col.name())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let plan = wrap_user_into_projections(
|
||||
&sort_required_cols,
|
||||
&column_names,
|
||||
Arc::clone(child_sort.input()),
|
||||
|plan| {
|
||||
Ok(Arc::new(SortPreservingMergeExec::new(
|
||||
reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?,
|
||||
plan,
|
||||
)))
|
||||
},
|
||||
)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_proj) = child_any.downcast_ref::<ProjectionExec>() {
|
||||
let expr = column_indices
|
||||
.iter()
|
||||
.map(|idx| child_proj.expr()[*idx].clone())
|
||||
.collect();
|
||||
let plan = Arc::new(ProjectionExec::try_new(
|
||||
expr,
|
||||
Arc::clone(child_proj.input()),
|
||||
)?);
|
||||
|
||||
// need to call `optimize` directly on the plan, because otherwise we would continue with the child
|
||||
// and miss the optimization of that particular new ProjectionExec
|
||||
let plan = self.optimize(plan, config)?;
|
||||
|
||||
return Ok(Some(plan));
|
||||
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
|
||||
let dedup_required_cols = child_dedup.sort_columns();
|
||||
|
@ -793,6 +835,135 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
// since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec`
|
||||
#[test]
|
||||
fn test_sortpreservingmerge_projection_split() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &schema), String::from("tag1"))],
|
||||
Arc::new(SortPreservingMergeExec::new(
|
||||
vec![PhysicalSortExpr {
|
||||
expr: expr_col("tag2", &schema),
|
||||
options: SortOptions {
|
||||
descending: true,
|
||||
..Default::default()
|
||||
},
|
||||
}],
|
||||
Arc::new(TestExec::new(schema)),
|
||||
)),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
insta::assert_yaml_snapshot!(
|
||||
OptimizationTest::new(plan, opt),
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " SortPreservingMergeExec: [tag2@1 DESC]"
|
||||
- " Test"
|
||||
output:
|
||||
Ok:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " SortPreservingMergeExec: [tag2@1 DESC]"
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
|
||||
- " Test"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_proj_inner_is_impure() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(EmptyExec::new(false, schema));
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![
|
||||
(
|
||||
Arc::new(Literal::new(ScalarValue::from("foo"))),
|
||||
String::from("tag1"),
|
||||
),
|
||||
(
|
||||
Arc::new(Literal::new(ScalarValue::from("bar"))),
|
||||
String::from("tag2"),
|
||||
),
|
||||
],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
insta::assert_yaml_snapshot!(
|
||||
OptimizationTest::new(plan, opt),
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " ProjectionExec: expr=[foo as tag1, bar as tag2]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
output:
|
||||
Ok:
|
||||
- " ProjectionExec: expr=[foo as tag1]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nested_proj_inner_is_pure() {
|
||||
let schema = schema();
|
||||
let plan = Arc::new(EmptyExec::new(false, schema));
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![
|
||||
(expr_col("tag1", &plan.schema()), String::from("tag1")),
|
||||
(expr_col("tag2", &plan.schema()), String::from("tag2")),
|
||||
],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let plan = Arc::new(
|
||||
ProjectionExec::try_new(
|
||||
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
|
||||
plan,
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
let opt = ProjectionPushdown::default();
|
||||
let test = OptimizationTest::new(plan, opt);
|
||||
insta::assert_yaml_snapshot!(
|
||||
test,
|
||||
@r###"
|
||||
---
|
||||
input:
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1]"
|
||||
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
output:
|
||||
Ok:
|
||||
- " EmptyExec: produce_one_row=false"
|
||||
"###
|
||||
);
|
||||
let empty_exec = test
|
||||
.output_plan()
|
||||
.unwrap()
|
||||
.as_any()
|
||||
.downcast_ref::<EmptyExec>()
|
||||
.unwrap();
|
||||
let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]);
|
||||
assert_eq!(empty_exec.schema().as_ref(), &expected_schema);
|
||||
}
|
||||
|
||||
// since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec`
|
||||
#[test]
|
||||
fn test_dedup_projection_split1() {
|
||||
|
|
|
@ -1089,7 +1089,7 @@ impl Deduplicater {
|
|||
|
||||
// Create the bottom node RecordBatchesExec for this chunk
|
||||
let mut input = chunks_to_physical_nodes(
|
||||
&input_schema,
|
||||
&input_schema.as_arrow(),
|
||||
output_sort_key,
|
||||
vec![Arc::clone(&chunk)],
|
||||
predicate,
|
||||
|
@ -1267,7 +1267,7 @@ impl Deduplicater {
|
|||
debug!("Build one scan RecordBatchesExec for all non duplicated chunks even if empty");
|
||||
|
||||
plans.push(chunks_to_physical_nodes(
|
||||
output_schema,
|
||||
&output_schema.as_arrow(),
|
||||
output_sort_key,
|
||||
chunks.into_no_duplicates(deduplication),
|
||||
predicate,
|
||||
|
@ -1452,7 +1452,7 @@ mod test {
|
|||
|
||||
// IOx scan operator
|
||||
let input = chunks_to_physical_nodes(
|
||||
chunk.schema(),
|
||||
&chunk.schema().as_arrow(),
|
||||
None,
|
||||
vec![Arc::clone(&chunk)],
|
||||
Predicate::default(),
|
||||
|
@ -1540,7 +1540,7 @@ mod test {
|
|||
|
||||
// IOx scan operator
|
||||
let input = chunks_to_physical_nodes(
|
||||
chunk.schema(),
|
||||
&chunk.schema().as_arrow(),
|
||||
None,
|
||||
vec![Arc::clone(&chunk)],
|
||||
Predicate::default(),
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::{
|
|||
provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk,
|
||||
QueryChunkData,
|
||||
};
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use datafusion::{
|
||||
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
|
||||
physical_expr::execution_props::ExecutionProps,
|
||||
|
@ -135,14 +136,14 @@ fn combine_sort_key(
|
|||
/// pushdown ([`RecordBatchesExec`] has NO builtin filter function). Delete predicates are NOT applied at all. The
|
||||
/// caller is responsible for wrapping the output node into appropriate filter nodes.
|
||||
pub fn chunks_to_physical_nodes(
|
||||
iox_schema: &Schema,
|
||||
schema: &SchemaRef,
|
||||
output_sort_key: Option<&SortKey>,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: Predicate,
|
||||
target_partitions: usize,
|
||||
) -> Arc<dyn ExecutionPlan> {
|
||||
if chunks.is_empty() {
|
||||
return Arc::new(EmptyExec::new(false, iox_schema.as_arrow()));
|
||||
return Arc::new(EmptyExec::new(false, Arc::clone(schema)));
|
||||
}
|
||||
|
||||
let mut record_batch_chunks: Vec<Arc<dyn QueryChunk>> = vec![];
|
||||
|
@ -177,7 +178,7 @@ pub fn chunks_to_physical_nodes(
|
|||
if !record_batch_chunks.is_empty() {
|
||||
output_nodes.push(Arc::new(RecordBatchesExec::new(
|
||||
record_batch_chunks,
|
||||
iox_schema.as_arrow(),
|
||||
Arc::clone(schema),
|
||||
)));
|
||||
}
|
||||
let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect();
|
||||
|
@ -202,14 +203,12 @@ pub fn chunks_to_physical_nodes(
|
|||
);
|
||||
|
||||
// Tell datafusion about the sort key, if any
|
||||
let file_schema = iox_schema.as_arrow();
|
||||
let output_ordering =
|
||||
sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, &file_schema));
|
||||
let output_ordering = sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, schema));
|
||||
|
||||
let props = ExecutionProps::new();
|
||||
let filter_expr = predicate.filter_expr()
|
||||
.and_then(|filter_expr| {
|
||||
match create_physical_expr_from_schema(&props, &filter_expr, &file_schema) {
|
||||
match create_physical_expr_from_schema(&props, &filter_expr, schema) {
|
||||
Ok(f) => Some(f),
|
||||
Err(e) => {
|
||||
warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down");
|
||||
|
@ -220,7 +219,7 @@ pub fn chunks_to_physical_nodes(
|
|||
|
||||
let base_config = FileScanConfig {
|
||||
object_store_url,
|
||||
file_schema,
|
||||
file_schema: Arc::clone(schema),
|
||||
file_groups,
|
||||
statistics: Statistics::default(),
|
||||
projection: None,
|
||||
|
@ -361,7 +360,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_chunks_to_physical_nodes_empty() {
|
||||
let schema = TestChunk::new("table").schema().clone();
|
||||
let schema = TestChunk::new("table").schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_execution_plan(&plan),
|
||||
|
@ -375,7 +374,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_chunks_to_physical_nodes_recordbatch() {
|
||||
let chunk = TestChunk::new("table");
|
||||
let schema = chunk.schema().clone();
|
||||
let schema = chunk.schema().as_arrow();
|
||||
let plan =
|
||||
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
|
@ -391,7 +390,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_chunks_to_physical_nodes_parquet_one_file() {
|
||||
let chunk = TestChunk::new("table").with_dummy_parquet_file();
|
||||
let schema = chunk.schema().clone();
|
||||
let schema = chunk.schema().as_arrow();
|
||||
let plan =
|
||||
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
|
||||
insta::assert_yaml_snapshot!(
|
||||
|
@ -409,7 +408,7 @@ mod tests {
|
|||
let chunk1 = TestChunk::new("table").with_id(0).with_dummy_parquet_file();
|
||||
let chunk2 = TestChunk::new("table").with_id(1).with_dummy_parquet_file();
|
||||
let chunk3 = TestChunk::new("table").with_id(2).with_dummy_parquet_file();
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -435,7 +434,7 @@ mod tests {
|
|||
let chunk2 = TestChunk::new("table")
|
||||
.with_id(1)
|
||||
.with_dummy_parquet_file_and_store("iox2://");
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
@ -458,7 +457,7 @@ mod tests {
|
|||
fn test_chunks_to_physical_nodes_mixed() {
|
||||
let chunk1 = TestChunk::new("table").with_dummy_parquet_file();
|
||||
let chunk2 = TestChunk::new("table");
|
||||
let schema = chunk1.schema().clone();
|
||||
let schema = chunk1.schema().as_arrow();
|
||||
let plan = chunks_to_physical_nodes(
|
||||
&schema,
|
||||
None,
|
||||
|
|
|
@ -9,7 +9,7 @@ license.workspace = true
|
|||
arrow = { workspace = true, features = ["prettyprint"] }
|
||||
arrow-flight = { workspace = true }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.8"
|
||||
assert_cmd = "2.0.9"
|
||||
bytes = "1.4"
|
||||
data_types = { path = "../data_types" }
|
||||
dml = { path = "../dml" }
|
||||
|
|
|
@ -63,7 +63,7 @@ parking_lot = { version = "0.12", features = ["arc_lock"] }
|
|||
parquet = { version = "34", features = ["async", "experimental"] }
|
||||
petgraph = { version = "0.6" }
|
||||
phf_shared = { version = "0.11" }
|
||||
predicates = { version = "2" }
|
||||
predicates = { version = "3" }
|
||||
prost = { version = "0.11" }
|
||||
prost-types = { version = "0.11" }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
|
|
Loading…
Reference in New Issue