From 777128f2b75266092e0ab21576398a7d926eadff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 21 Mar 2023 21:15:53 +0100 Subject: [PATCH] feat(flightsql): Avoid planning Flight queries twice (#7281) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- flightsql/src/planner.rs | 81 +++++++++++++++++---------------- flightsql/src/sql_info/mod.rs | 22 ++++++++- flightsql/src/sql_info/value.rs | 5 ++ 3 files changed, 66 insertions(+), 42 deletions(-) diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 47d40d9167..81515fc50e 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -1,7 +1,11 @@ //! FlightSQL handling use std::sync::Arc; -use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; +use arrow::{ + datatypes::{DataType, Field, Schema}, + error::ArrowError, + ipc::writer::IpcWriteOptions, +}; use arrow_flight::{ sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, @@ -14,6 +18,7 @@ use bytes::Bytes; use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue}; use iox_query::{exec::IOxSessionContext, QueryNamespace}; use observability_deps::tracing::debug; +use once_cell::sync::Lazy; use prost::Message; use crate::{error::*, sql_info::iox_sql_info_list}; @@ -44,50 +49,20 @@ impl FlightSQLPlanner { FlightSQLCommand::CommandPreparedStatementQuery(handle) => { get_schema_for_query(handle.query(), ctx).await } - FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { info }) => { - let plan = plan_get_sql_info(ctx, info).await?; - // As an optimization, we could hard code the result - // schema instead of recomputing it each time. - get_schema_for_plan(plan) + FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { .. }) => { + encode_schema(iox_sql_info_list().schema()) } FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => { - let plan = plan_get_catalogs(ctx).await?; - // As an optimization, we could hard code the result - // schema instead of recomputing it each time. - get_schema_for_plan(plan) + encode_schema(&GET_CATALOG_SCHEMA) } - FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { - catalog, - db_schema_filter_pattern, - }) => { - let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; - // As an optimization, we could hard code the result - // schema instead of recomputing it each time. - get_schema_for_plan(plan) + FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => { + encode_schema(&GET_DB_SCHEMAS_SCHEMA) } - FlightSQLCommand::CommandGetTables(CommandGetTables { - catalog, - db_schema_filter_pattern, - table_name_filter_pattern, - table_types, - include_schema, - }) => { - let plan = plan_get_tables( - ctx, - catalog, - db_schema_filter_pattern, - table_name_filter_pattern, - table_types, - include_schema, - ) - .await?; - get_schema_for_plan(plan) + FlightSQLCommand::CommandGetTables(CommandGetTables { .. }) => { + encode_schema(&GET_TABLES_SCHEMA) } - FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => { - let plan = plan_get_table_types(ctx).await?; - // As an optimization, we could hard code the result - // schema instead of recomputing it each time. - get_schema_for_plan(plan) + FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes { .. }) => { + encode_schema(&GET_TABLE_TYPE_SCHEMA) } FlightSQLCommand::ActionCreatePreparedStatementRequest(_) | FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu { @@ -269,6 +244,10 @@ async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result { Ok(ctx.sql_to_logical_plan(query).await?) } +/// The schema for GetCatalogs +static GET_CATALOG_SCHEMA: Lazy = + Lazy::new(|| Schema::new(vec![Field::new("catalog_name", DataType::Utf8, false)])); + /// Return a `LogicalPlan` for GetDbSchemas /// /// # Parameters @@ -310,6 +289,14 @@ async fn plan_get_db_schemas( Ok(plan.with_param_values(params)?) } +/// The schema for GetDbSchemas +static GET_DB_SCHEMAS_SCHEMA: Lazy = Lazy::new(|| { + Schema::new(vec![ + Field::new("catalog_name", DataType::Utf8, false), + Field::new("db_schema_name", DataType::Utf8, false), + ]) +}); + /// Return a `LogicalPlan` for GetTables async fn plan_get_tables( ctx: &IOxSessionContext, @@ -327,6 +314,16 @@ async fn plan_get_tables( Ok(ctx.sql_to_logical_plan(query).await?) } +/// The schema for GetTables +static GET_TABLES_SCHEMA: Lazy = Lazy::new(|| { + Schema::new(vec![ + Field::new("catalog_name", DataType::Utf8, false), + Field::new("db_schema_name", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("table_type", DataType::Utf8, false), + ]) +}); + /// Return a `LogicalPlan` for GetTableTypes /// /// In the future this could be made more efficient by building the @@ -336,3 +333,7 @@ async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result { let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type"; Ok(ctx.sql_to_logical_plan(query).await?) } + +/// The schema for GetTableTypes +static GET_TABLE_TYPE_SCHEMA: Lazy = + Lazy::new(|| Schema::new(vec![Field::new("table_type", DataType::Utf8, false)])); diff --git a/flightsql/src/sql_info/mod.rs b/flightsql/src/sql_info/mod.rs index a851557c1e..35d8afbb65 100644 --- a/flightsql/src/sql_info/mod.rs +++ b/flightsql/src/sql_info/mod.rs @@ -24,7 +24,11 @@ mod value; use crate::error::Result; use std::{borrow::Cow, collections::BTreeMap, sync::Arc}; -use arrow::{array::UInt32Builder, record_batch::RecordBatch}; +use arrow::{ + array::UInt32Builder, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; use arrow_flight::sql::{ SqlInfo, SqlNullOrdering, SqlSupportedCaseSensitivity, SqlSupportedTransactions, SupportedSqlGrammar, @@ -93,8 +97,22 @@ impl SqlInfoList { ])?; Ok(batch) } + + /// Return the schema for the record batches produced + pub fn schema(&self) -> &Schema { + // It is always the same + &SCHEMA + } } +// The schema produced by [`SqlInfoList`] +static SCHEMA: Lazy = Lazy::new(|| { + Schema::new(vec![ + Field::new("info_name", DataType::UInt32, false), + Field::new("value", SqlInfoUnionBuilder::schema().clone(), false), + ]) +}); + #[allow(non_snake_case)] static INSTANCE: Lazy = Lazy::new(|| { // The following are not defined in the [`SqlInfo`], but are @@ -237,7 +255,7 @@ static INSTANCE: Lazy = Lazy::new(|| { .with_sql_info(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false) }); -/// Return the static SqlInfoList that describes IOx's capablity +/// Return a [`SqlInfoList`] that describes IOx's capablities pub fn iox_sql_info_list() -> &'static SqlInfoList { &INSTANCE } diff --git a/flightsql/src/sql_info/value.rs b/flightsql/src/sql_info/value.rs index 9fa46647ca..2168875fd2 100644 --- a/flightsql/src/sql_info/value.rs +++ b/flightsql/src/sql_info/value.rs @@ -143,6 +143,11 @@ impl SqlInfoUnionBuilder { } } + /// Returns the DataType created by this builder + pub fn schema() -> &'static DataType { + &UNION_TYPE + } + /// Append the specified value to this builder pub fn append_value(&mut self, v: &SqlInfoValue) { // typeid is which child and len is the child array's length