feat(flightsql): Support `GetTableTypes` metadata API (#7242)

* feat(flightsql): Support `GetTableTypes` metadata API

* chore: comment about possible optimization

* chore: fix logical conflict
pull/24376/head
Andrew Lamb 2023-03-17 14:12:23 +01:00 committed by GitHub
parent f0fc79ee3b
commit 2bc495b29e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 5 deletions

View File

@ -4,7 +4,8 @@ use std::fmt::Display;
use arrow_flight::sql::{ use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, CommandStatementQuery, CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandPreparedStatementQuery,
CommandStatementQuery,
}; };
use bytes::Bytes; use bytes::Bytes;
use prost::Message; use prost::Message;
@ -74,6 +75,8 @@ pub enum FlightSQLCommand {
/// Get a list of the available schemas. See [`CommandGetDbSchemas`] /// Get a list of the available schemas. See [`CommandGetDbSchemas`]
/// for details and how to interpret the parameters. /// for details and how to interpret the parameters.
CommandGetDbSchemas(CommandGetDbSchemas), CommandGetDbSchemas(CommandGetDbSchemas),
/// Get a list of the available table tyypes
CommandGetTableTypes(CommandGetTableTypes),
/// Create a prepared statement /// Create a prepared statement
ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest), ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest),
/// Close a prepared statement /// Close a prepared statement
@ -102,6 +105,9 @@ impl Display for FlightSQLCommand {
.unwrap_or("<NONE>") .unwrap_or("<NONE>")
) )
} }
Self::CommandGetTableTypes(CommandGetTableTypes {}) => {
write!(f, "CommandGetTableTypes")
}
Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest { Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest {
query, query,
}) => { }) => {
@ -133,6 +139,8 @@ impl FlightSQLCommand {
Ok(Self::CommandGetCatalogs(decoded_cmd)) Ok(Self::CommandGetCatalogs(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? { } else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? {
Ok(Self::CommandGetDbSchemas(decoded_cmd)) Ok(Self::CommandGetDbSchemas(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetTableTypes>(&msg)? {
Ok(Self::CommandGetTableTypes(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<ActionCreatePreparedStatementRequest>(&msg)? } else if let Some(decoded_cmd) = Any::unpack::<ActionCreatePreparedStatementRequest>(&msg)?
{ {
Ok(Self::ActionCreatePreparedStatementRequest(decoded_cmd)) Ok(Self::ActionCreatePreparedStatementRequest(decoded_cmd))
@ -165,6 +173,7 @@ impl FlightSQLCommand {
} }
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd),
FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd),
FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => { FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => {
let prepared_statement_handle = handle.encode(); let prepared_statement_handle = handle.encode();

View File

@ -5,7 +5,7 @@ use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions};
use arrow_flight::{ use arrow_flight::{
sql::{ sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery, CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandStatementQuery,
}, },
IpcMessage, SchemaAsIpc, IpcMessage, SchemaAsIpc,
}; };
@ -45,6 +45,8 @@ impl FlightSQLPlanner {
} }
FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => { FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => {
let plan = plan_get_catalogs(ctx).await?; let plan = plan_get_catalogs(ctx).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan) get_schema_for_plan(plan)
} }
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas {
@ -52,6 +54,14 @@ impl FlightSQLPlanner {
db_schema_filter_pattern, db_schema_filter_pattern,
}) => { }) => {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
}
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => {
let plan = plan_get_table_types(ctx).await?;
// As an optimization, we could hard code the result
// schema instead of recomputing it each time.
get_schema_for_plan(plan) get_schema_for_plan(plan)
} }
FlightSQLCommand::ActionCreatePreparedStatementRequest(_) FlightSQLCommand::ActionCreatePreparedStatementRequest(_)
@ -100,6 +110,11 @@ impl FlightSQLPlanner {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
Ok(ctx.create_physical_plan(&plan).await?) Ok(ctx.create_physical_plan(&plan).await?)
} }
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => {
debug!("Planning GetTableTypes query");
let plan = plan_get_table_types(ctx).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::ActionClosePreparedStatementRequest(_) FlightSQLCommand::ActionClosePreparedStatementRequest(_)
| FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu { | FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu {
cmd: format!("{cmd:?}"), cmd: format!("{cmd:?}"),
@ -237,3 +252,13 @@ async fn plan_get_db_schemas(
debug!(?plan, "Prepared plan is"); debug!(?plan, "Prepared plan is");
Ok(plan.with_param_values(params)?) Ok(plan.with_param_values(params)?)
} }
/// Return a `LogicalPlan` for GetTableTypes
///
/// In the future this could be made more efficient by building the
/// response directly from the IOx catalog rather than running an
/// entire DataFusion plan.
async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type";
Ok(ctx.sql_to_logical_plan(query).await?)
}

View File

@ -184,6 +184,51 @@ async fn flightsql_get_catalogs() {
.await .await
} }
#[tokio::test]
async fn flightsql_get_table_types() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let stream = client.get_table_types().await.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- +------------+
- "| table_type |"
- +------------+
- "| BASE TABLE |"
- "| VIEW |"
- +------------+
"###
);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test] #[tokio::test]
async fn flightsql_get_db_schemas() { async fn flightsql_get_db_schemas() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
@ -394,6 +439,7 @@ async fn flightsql_jdbc() {
------------\n\ ------------\n\
public"; public";
// CommandGetSchemas output
let expected_schemas = "**************\n\ let expected_schemas = "**************\n\
Schemas:\n\ Schemas:\n\
**************\n\ **************\n\
@ -403,6 +449,15 @@ async fn flightsql_jdbc() {
iox, public\n\ iox, public\n\
system, public"; system, public";
// CommandGetTableTypes output
let expected_table_types = "**************\n\
Table Types:\n\
**************\n\
TABLE_TYPE\n\
------------\n\
BASE TABLE\n\
VIEW";
// Validate metadata: jdbc_client <url> metadata // Validate metadata: jdbc_client <url> metadata
Command::from_std(std::process::Command::new(&path)) Command::from_std(std::process::Command::new(&path))
.arg(&jdbc_url) .arg(&jdbc_url)
@ -410,7 +465,8 @@ async fn flightsql_jdbc() {
.assert() .assert()
.success() .success()
.stdout(predicate::str::contains(expected_catalogs)) .stdout(predicate::str::contains(expected_catalogs))
.stdout(predicate::str::contains(expected_schemas)); .stdout(predicate::str::contains(expected_schemas))
.stdout(predicate::str::contains(expected_table_types));
} }
.boxed() .boxed()
})), })),

View File

@ -132,6 +132,11 @@ public class Main {
System.out.println("**************"); System.out.println("**************");
print_result_set(md.getSchemas()); print_result_set(md.getSchemas());
System.out.println("**************");
System.out.println("Table Types:");
System.out.println("**************");
print_result_set(md.getTableTypes());
//System.out.println("isReadOnly: " + md.isReadOnly()); //System.out.println("isReadOnly: " + md.isReadOnly());
//System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); //System.out.println("getSearchStringEscape: " + md.getSearchStringEscape());
//System.out.println("getDriverVersion: " + md.getDriverVersion()); //System.out.println("getDriverVersion: " + md.getDriverVersion());

View File

@ -29,8 +29,8 @@ use arrow_flight::{
error::{FlightError, Result}, error::{FlightError, Result},
sql::{ sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes,
CommandStatementQuery, ProstMessageExt, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
}, },
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
}; };
@ -164,6 +164,16 @@ impl FlightSqlClient {
self.do_get_with_cmd(msg.as_any()).await self.do_get_with_cmd(msg.as_any()).await
} }
/// List the table types on this server using a [`CommandGetTableTypes`] message.
///
/// This implementation does not support alternate endpoints
///
/// [`CommandGetTableTypes`]: https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1243-L1259
pub async fn get_table_types(&mut self) -> Result<FlightRecordBatchStream> {
let msg = CommandGetTableTypes {};
self.do_get_with_cmd(msg.as_any()).await
}
/// Implements the canonical interaction for most FlightSQL messages: /// Implements the canonical interaction for most FlightSQL messages:
/// ///
/// 1. Call `GetFlightInfo` with the provided message, and get a /// 1. Call `GetFlightInfo` with the provided message, and get a