feat(flightsql): Add `GetTables` metadata endpoint without filters (#7238)

* test: Add getTables jdbc_client example

* feat: add `CommandGetTables` in FlightSqlClient

* feat: add `CommandGetTables` in flightsql cmd and planner

* test: add e2e test for `CommandGetTables`

* chore: clippy

* chore: comment out the test with filters

* test: update jdbc test expected value for tables

---------

Co-authored-by: Chunchun <14298407+appletreeisyellow@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-03-20 20:20:13 +01:00 committed by GitHub
parent 1d8c85e61c
commit 795a79a7ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 187 additions and 4 deletions

View File

@ -4,8 +4,8 @@ use std::fmt::Display;
use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandPreparedStatementQuery,
CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables,
CommandPreparedStatementQuery, CommandStatementQuery,
};
use bytes::Bytes;
use prost::Message;
@ -75,6 +75,8 @@ pub enum FlightSQLCommand {
/// Get a list of the available schemas. See [`CommandGetDbSchemas`]
/// for details and how to interpret the parameters.
CommandGetDbSchemas(CommandGetDbSchemas),
/// Get a list of the available tables
CommandGetTables(CommandGetTables),
/// Get a list of the available table tyypes
CommandGetTableTypes(CommandGetTableTypes),
/// Create a prepared statement
@ -105,6 +107,30 @@ impl Display for FlightSQLCommand {
.unwrap_or("<NONE>")
)
}
Self::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
}) => {
write!(
f,
"CommandGetTables(catalog={}, db_schema_filter_pattern={},\
table_name_filter_pattern={},table_types={},include_schema={})",
catalog.as_ref().map(|c| c.as_ref()).unwrap_or("<NONE>"),
db_schema_filter_pattern
.as_ref()
.map(|db| db.as_ref())
.unwrap_or("<NONE>"),
table_name_filter_pattern
.as_ref()
.map(|t| t.as_ref())
.unwrap_or("<NONE>"),
table_types.join(","),
include_schema,
)
}
Self::CommandGetTableTypes(CommandGetTableTypes {}) => {
write!(f, "CommandGetTableTypes")
}
@ -139,6 +165,8 @@ impl FlightSQLCommand {
Ok(Self::CommandGetCatalogs(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? {
Ok(Self::CommandGetDbSchemas(decoded_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetTables>(&msg)? {
Ok(Self::CommandGetTables(decode_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetTableTypes>(&msg)? {
Ok(Self::CommandGetTableTypes(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<ActionCreatePreparedStatementRequest>(&msg)?
@ -173,6 +201,7 @@ impl FlightSQLCommand {
}
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd),
FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd),
FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => {

View File

@ -5,7 +5,8 @@ use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions};
use arrow_flight::{
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables,
CommandStatementQuery,
},
IpcMessage, SchemaAsIpc,
};
@ -58,6 +59,24 @@ impl FlightSQLPlanner {
// schema instead of recomputing it each time.
get_schema_for_plan(plan)
}
FlightSQLCommand::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
}) => {
let plan = plan_get_tables(
ctx,
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
)
.await?;
get_schema_for_plan(plan)
}
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => {
let plan = plan_get_table_types(ctx).await?;
// As an optimization, we could hard code the result
@ -110,6 +129,25 @@ impl FlightSQLPlanner {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
}) => {
debug!("Planning GetTables query");
let plan = plan_get_tables(
ctx,
catalog,
db_schema_filter_pattern,
table_name_filter_pattern,
table_types,
include_schema,
)
.await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => {
debug!("Planning GetTableTypes query");
let plan = plan_get_table_types(ctx).await?;
@ -253,6 +291,23 @@ async fn plan_get_db_schemas(
Ok(plan.with_param_values(params)?)
}
/// Return a `LogicalPlan` for GetTables
async fn plan_get_tables(
ctx: &IOxSessionContext,
_catalog: Option<String>,
_db_schema_filter_pattern: Option<String>,
_table_name_filter_pattern: Option<String>,
_table_types: Vec<String>,
_include_schema: bool,
) -> Result<LogicalPlan> {
let query = "SELECT table_catalog AS catalog_name, \
table_schema AS db_schema_name, table_name, table_type \
FROM information_schema.tables \
ORDER BY table_catalog, table_schema, table_name";
Ok(ctx.sql_to_logical_plan(query).await?)
}
/// Return a `LogicalPlan` for GetTableTypes
///
/// In the future this could be made more efficient by building the

View File

@ -184,6 +184,58 @@ async fn flightsql_get_catalogs() {
.await
}
#[tokio::test]
async fn flightsql_get_tables() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let stream = client
.get_tables(Some(""), Some(""), Some(""), None)
.await
.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- +--------------+--------------------+-------------+------------+
- "| catalog_name | db_schema_name | table_name | table_type |"
- +--------------+--------------------+-------------+------------+
- "| public | information_schema | columns | VIEW |"
- "| public | information_schema | df_settings | VIEW |"
- "| public | information_schema | tables | VIEW |"
- "| public | information_schema | views | VIEW |"
- "| public | iox | the_table | BASE TABLE |"
- "| public | system | queries | BASE TABLE |"
- +--------------+--------------------+-------------+------------+
"###
);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
async fn flightsql_get_table_types() {
test_helpers::maybe_start_logging();
@ -448,6 +500,18 @@ async fn flightsql_jdbc() {
information_schema, public\n\
iox, public\n\
system, public";
// CommandGetTables output
let expected_tables = "**************\n\
Tables:\n\
**************\n\
TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS, TYPE_CAT, TYPE_SCHEM, TYPE_NAME, SELF_REFERENCING_COL_NAME, REF_GENERATION\n\
------------\n\
public, information_schema, columns, VIEW, null, null, null, null, null, null\n\
public, information_schema, df_settings, VIEW, null, null, null, null, null, null\n\
public, information_schema, tables, VIEW, null, null, null, null, null, null\n\
public, information_schema, views, VIEW, null, null, null, null, null, null\n\
public, iox, the_table, BASE TABLE, null, null, null, null, null, null\n\
public, system, queries, BASE TABLE, null, null, null, null, null, null";
// CommandGetTableTypes output
let expected_table_types = "**************\n\
@ -466,6 +530,7 @@ async fn flightsql_jdbc() {
.success()
.stdout(predicate::str::contains(expected_catalogs))
.stdout(predicate::str::contains(expected_schemas))
.stdout(predicate::str::contains(expected_tables))
.stdout(predicate::str::contains(expected_table_types));
}
.boxed()

View File

@ -132,6 +132,17 @@ public class Main {
System.out.println("**************");
print_result_set(md.getSchemas());
System.out.println("**************");
System.out.println("Tables:");
System.out.println("**************");
// null means no filtering
print_result_set(md.getTables(null, null, null, null));
// System.out.println("**************");
// System.out.println("Tables (system table filter):");
// System.out.println("**************");
// print_result_set(md.getTables("public", "system", null, null));
System.out.println("**************");
System.out.println("Table Types:");
System.out.println("**************");

View File

@ -29,7 +29,7 @@ use arrow_flight::{
error::{FlightError, Result},
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables,
CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
},
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
@ -164,6 +164,29 @@ impl FlightSqlClient {
self.do_get_with_cmd(msg.as_any()).await
}
/// List the tables on this server using a [`CommandGetTables`] message.
///
/// This implementation does not support alternate endpoints
///
/// [`CommandGetTables`]: https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1176-L1241
pub async fn get_tables(
&mut self,
_catalog: Option<impl Into<String> + Send>,
_db_schema_filter_pattern: Option<impl Into<String> + Send>,
_table_name_filter_pattern: Option<impl Into<String> + Send>,
_table_types: Option<Vec<String>>,
) -> Result<FlightRecordBatchStream> {
let msg = CommandGetTables {
catalog: None,
db_schema_filter_pattern: None,
table_name_filter_pattern: None,
table_types: vec![],
// TODO: implement include_schema after optional query parameters are done
include_schema: false,
};
self.do_get_with_cmd(msg.as_any()).await
}
/// List the table types on this server using a [`CommandGetTableTypes`] message.
///
/// This implementation does not support alternate endpoints