feat(flightsql): Avoid planning Flight queries twice (#7281)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-03-21 21:15:53 +01:00 committed by GitHub
parent f780aba353
commit 777128f2b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 42 deletions

View File

@ -1,7 +1,11 @@
//! FlightSQL handling
use std::sync::Arc;
use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions};
use arrow::{
datatypes::{DataType, Field, Schema},
error::ArrowError,
ipc::writer::IpcWriteOptions,
};
use arrow_flight::{
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
@ -14,6 +18,7 @@ use bytes::Bytes;
use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue};
use iox_query::{exec::IOxSessionContext, QueryNamespace};
use observability_deps::tracing::debug;
use once_cell::sync::Lazy;
use prost::Message;
use crate::{error::*, sql_info::iox_sql_info_list};
@ -44,50 +49,20 @@ impl FlightSQLPlanner {
FlightSQLCommand::CommandPreparedStatementQuery(handle) => {
get_schema_for_query(handle.query(), ctx).await
}
FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { info }) => {
let plan = plan_get_sql_info(ctx, info).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { .. }) => {
encode_schema(iox_sql_info_list().schema())
}
FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => {
let plan = plan_get_catalogs(ctx).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
encode_schema(&GET_CATALOG_SCHEMA)
}
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas {
catalog,
db_schema_filter_pattern,
}) => {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => {
encode_schema(&GET_DB_SCHEMAS_SCHEMA)
}
FlightSQLCommand::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
}) => {
let plan = plan_get_tables(
ctx,
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
)
.await?;
get_schema_for_plan(plan)
FlightSQLCommand::CommandGetTables(CommandGetTables { .. }) => {
encode_schema(&GET_TABLES_SCHEMA)
}
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => {
let plan = plan_get_table_types(ctx).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes { .. }) => {
encode_schema(&GET_TABLE_TYPE_SCHEMA)
}
FlightSQLCommand::ActionCreatePreparedStatementRequest(_)
| FlightSQLCommand::ActionClosePreparedStatementRequest(_) => ProtocolSnafu {
@ -269,6 +244,10 @@ async fn plan_get_catalogs(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
Ok(ctx.sql_to_logical_plan(query).await?)
}
/// The schema for GetCatalogs
static GET_CATALOG_SCHEMA: Lazy<Schema> =
Lazy::new(|| Schema::new(vec![Field::new("catalog_name", DataType::Utf8, false)]));
/// Return a `LogicalPlan` for GetDbSchemas
///
/// # Parameters
@ -310,6 +289,14 @@ async fn plan_get_db_schemas(
Ok(plan.with_param_values(params)?)
}
/// The schema for GetDbSchemas
static GET_DB_SCHEMAS_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("db_schema_name", DataType::Utf8, false),
])
});
/// Return a `LogicalPlan` for GetTables
async fn plan_get_tables(
ctx: &IOxSessionContext,
@ -327,6 +314,16 @@ async fn plan_get_tables(
Ok(ctx.sql_to_logical_plan(query).await?)
}
/// The schema for GetTables
static GET_TABLES_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("db_schema_name", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
])
});
/// Return a `LogicalPlan` for GetTableTypes
///
/// In the future this could be made more efficient by building the
@ -336,3 +333,7 @@ async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type";
Ok(ctx.sql_to_logical_plan(query).await?)
}
/// The schema for GetTableTypes
static GET_TABLE_TYPE_SCHEMA: Lazy<Schema> =
Lazy::new(|| Schema::new(vec![Field::new("table_type", DataType::Utf8, false)]));

View File

@ -24,7 +24,11 @@ mod value;
use crate::error::Result;
use std::{borrow::Cow, collections::BTreeMap, sync::Arc};
use arrow::{array::UInt32Builder, record_batch::RecordBatch};
use arrow::{
array::UInt32Builder,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_flight::sql::{
SqlInfo, SqlNullOrdering, SqlSupportedCaseSensitivity, SqlSupportedTransactions,
SupportedSqlGrammar,
@ -93,8 +97,22 @@ impl SqlInfoList {
])?;
Ok(batch)
}
/// Return the schema for the record batches produced
pub fn schema(&self) -> &Schema {
// It is always the same
&SCHEMA
}
}
// The schema produced by [`SqlInfoList`]
static SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::new(vec![
Field::new("info_name", DataType::UInt32, false),
Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
])
});
#[allow(non_snake_case)]
static INSTANCE: Lazy<SqlInfoList> = Lazy::new(|| {
// The following are not defined in the [`SqlInfo`], but are
@ -237,7 +255,7 @@ static INSTANCE: Lazy<SqlInfoList> = Lazy::new(|| {
.with_sql_info(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false)
});
/// Return the static SqlInfoList that describes IOx's capablity
/// Return a [`SqlInfoList`] that describes IOx's capablities
pub fn iox_sql_info_list() -> &'static SqlInfoList {
&INSTANCE
}

View File

@ -143,6 +143,11 @@ impl SqlInfoUnionBuilder {
}
}
/// Returns the DataType created by this builder
pub fn schema() -> &'static DataType {
&UNION_TYPE
}
/// Append the specified value to this builder
pub fn append_value(&mut self, v: &SqlInfoValue) {
// typeid is which child and len is the child array's length