feat(flightsql): Support `GetImportedKeys` metadata endpoint with an empty RecordBatch (#7546)

* feat: support CommandGetImportedKeys metadata endpoint with tests

* chore: remove comments that is no longer valid

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Chunchun Ye 2023-04-13 13:06:47 -05:00 committed by GitHub
parent b4003a70fe
commit 8bf47df621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 157 additions and 12 deletions

View File

@ -4,9 +4,9 @@ use std::fmt::Display;
use arrow_flight::sql::{ use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
CommandStatementQuery, CommandPreparedStatementQuery, CommandStatementQuery,
}; };
use bytes::Bytes; use bytes::Bytes;
use prost::Message; use prost::Message;
@ -83,6 +83,8 @@ pub enum FlightSQLCommand {
/// table's primary key columns (the foreign keys exported by a table) of a table. /// table's primary key columns (the foreign keys exported by a table) of a table.
/// See [`CommandGetExportedKeys`] for details. /// See [`CommandGetExportedKeys`] for details.
CommandGetExportedKeys(CommandGetExportedKeys), CommandGetExportedKeys(CommandGetExportedKeys),
/// Get the foreign keys of a table. See [`CommandGetImportedKeys`] for details.
CommandGetImportedKeys(CommandGetImportedKeys),
/// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details. /// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details.
CommandGetPrimaryKeys(CommandGetPrimaryKeys), CommandGetPrimaryKeys(CommandGetPrimaryKeys),
/// Get a list of the available tables /// Get a list of the available tables
@ -133,6 +135,19 @@ impl Display for FlightSQLCommand {
table table
) )
} }
Self::CommandGetImportedKeys(CommandGetImportedKeys {
catalog,
db_schema,
table,
}) => {
write!(
f,
"CommandGetImportedKeys(catalog={}, db_schema={}, table={})",
catalog.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
db_schema.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
table
)
}
Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys { Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog, catalog,
db_schema, db_schema,
@ -208,6 +223,8 @@ impl FlightSQLCommand {
Ok(Self::CommandGetDbSchemas(decoded_cmd)) Ok(Self::CommandGetDbSchemas(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetExportedKeys>(&msg)? { } else if let Some(decoded_cmd) = Any::unpack::<CommandGetExportedKeys>(&msg)? {
Ok(Self::CommandGetExportedKeys(decoded_cmd)) Ok(Self::CommandGetExportedKeys(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetImportedKeys>(&msg)? {
Ok(Self::CommandGetImportedKeys(decoded_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetPrimaryKeys>(&msg)? { } else if let Some(decode_cmd) = Any::unpack::<CommandGetPrimaryKeys>(&msg)? {
Ok(Self::CommandGetPrimaryKeys(decode_cmd)) Ok(Self::CommandGetPrimaryKeys(decode_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetTables>(&msg)? { } else if let Some(decode_cmd) = Any::unpack::<CommandGetTables>(&msg)? {
@ -248,6 +265,7 @@ impl FlightSQLCommand {
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetExportedKeys(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetExportedKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetImportedKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd),

View File

@ -11,8 +11,9 @@ use arrow::{
use arrow_flight::{ use arrow_flight::{
sql::{ sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandStatementQuery, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
CommandStatementQuery,
}, },
IpcMessage, SchemaAsIpc, IpcMessage, SchemaAsIpc,
}; };
@ -70,6 +71,9 @@ impl FlightSQLPlanner {
FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys { .. }) => { FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys { .. }) => {
encode_schema(&GET_EXPORTED_KEYS_SCHEMA) encode_schema(&GET_EXPORTED_KEYS_SCHEMA)
} }
FlightSQLCommand::CommandGetImportedKeys(CommandGetImportedKeys { .. }) => {
encode_schema(&GET_IMPORTED_KEYS_SCHEMA)
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => { FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => {
encode_schema(&GET_PRIMARY_KEYS_SCHEMA) encode_schema(&GET_PRIMARY_KEYS_SCHEMA)
} }
@ -144,6 +148,20 @@ impl FlightSQLPlanner {
let plan = plan_get_exported_keys(ctx, catalog, db_schema, table).await?; let plan = plan_get_exported_keys(ctx, catalog, db_schema, table).await?;
Ok(ctx.create_physical_plan(&plan).await?) Ok(ctx.create_physical_plan(&plan).await?)
} }
FlightSQLCommand::CommandGetImportedKeys(CommandGetImportedKeys {
catalog,
db_schema,
table,
}) => {
debug!(
?catalog,
?db_schema,
?table,
"Planning CommandGetImportedKeys query"
);
let plan = plan_get_imported_keys(ctx, catalog, db_schema, table).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog, catalog,
db_schema, db_schema,
@ -308,6 +326,16 @@ async fn plan_get_exported_keys(
Ok(ctx.batch_to_logical_plan(batch)?) Ok(ctx.batch_to_logical_plan(batch)?)
} }
async fn plan_get_imported_keys(
ctx: &IOxSessionContext,
_catalog: Option<String>,
_db_schema: Option<String>,
_table: String,
) -> Result<LogicalPlan> {
let batch = RecordBatch::new_empty(Arc::clone(&GET_IMPORTED_KEYS_SCHEMA));
Ok(ctx.batch_to_logical_plan(batch)?)
}
async fn plan_get_primary_keys( async fn plan_get_primary_keys(
ctx: &IOxSessionContext, ctx: &IOxSessionContext,
_catalog: Option<String>, _catalog: Option<String>,
@ -373,10 +401,24 @@ static GET_EXPORTED_KEYS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Field::new("key_sequence", DataType::Int32, false), Field::new("key_sequence", DataType::Int32, false),
Field::new("fk_key_name", DataType::Utf8, false), Field::new("fk_key_name", DataType::Utf8, false),
Field::new("pk_key_name", DataType::Utf8, false), Field::new("pk_key_name", DataType::Utf8, false),
// According to the definition in https://github.com/apache/arrow/blob/0434ab65075ecd1d2ab9245bcd7ec6038934ed29/format/FlightSql.proto#L1327-L1328 Field::new("update_rule", DataType::UInt8, false),
// update_rule and delete_rule are in type uint1 Field::new("delete_rule", DataType::UInt8, false),
// However, Rust DataType does not have this type, ]))
// the closet is DataType::UInt8 });
static GET_IMPORTED_KEYS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("pk_catalog_name", DataType::Utf8, false),
Field::new("pk_db_schema_name", DataType::Utf8, false),
Field::new("pk_table_name", DataType::Utf8, false),
Field::new("pk_column_name", DataType::Utf8, false),
Field::new("fk_catalog_name", DataType::Utf8, false),
Field::new("fk_db_schema_name", DataType::Utf8, false),
Field::new("fk_table_name", DataType::Utf8, false),
Field::new("fk_column_name", DataType::Utf8, false),
Field::new("key_sequence", DataType::Int32, false),
Field::new("fk_key_name", DataType::Utf8, false),
Field::new("pk_key_name", DataType::Utf8, false),
Field::new("update_rule", DataType::UInt8, false), Field::new("update_rule", DataType::UInt8, false),
Field::new("delete_rule", DataType::UInt8, false), Field::new("delete_rule", DataType::UInt8, false),
])) ]))

View File

@ -984,6 +984,52 @@ async fn flightsql_get_exported_keys() {
.await .await
} }
#[tokio::test]
async fn flightsql_get_imported_keys() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let catalog: Option<String> = None;
let db_schema: Option<String> = None;
let stream = client
.get_imported_keys(catalog, db_schema, table_name.to_string())
.await
.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- ++
- ++
"###
);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test] #[tokio::test]
async fn flightsql_get_primary_keys() { async fn flightsql_get_primary_keys() {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();

View File

@ -137,6 +137,12 @@ public class Main {
System.out.println("**************"); System.out.println("**************");
print_result_set(md.getExportedKeys(null, null, "system")); print_result_set(md.getExportedKeys(null, null, "system"));
System.out.println("**************");
System.out.println("ImportedKeys");
System.out.println("**************");
print_result_set(md.getImportedKeys(null, null, "system"));
System.out.println("**************"); System.out.println("**************");
System.out.println("PrimaryKeys:"); System.out.println("PrimaryKeys:");
System.out.println("**************"); System.out.println("**************");

View File

@ -29,9 +29,9 @@ use arrow_flight::{
error::{FlightError, Result}, error::{FlightError, Result},
sql::{ sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables,
CommandStatementQuery, ProstMessageExt, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
}, },
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
}; };
@ -215,6 +215,38 @@ impl FlightSqlClient {
self.do_get_with_cmd(msg.as_any()).await self.do_get_with_cmd(msg.as_any()).await
} }
/// List the foreign keys of a table on this server using a
/// [`CommandGetImportedKeys`] message.
///
/// # Parameters
///
/// Definition from <https://github.com/apache/arrow/blob/196222dbd543d6931f4a1432845add97be0db802/format/FlightSql.proto#L1354-L1403>
///
/// catalog: Specifies the catalog to search for the primary key table.
/// An empty string retrieves those without a catalog.
/// If omitted the catalog name should not be used to narrow the search.
///
/// db_schema: Specifies the schema to search for the primary key table.
/// An empty string retrieves those without a schema.
/// If omitted the schema name should not be used to narrow the search.
///
/// table: Specifies the primary key table to get the foreign keys for.
///
/// This implementation does not support alternate endpoints
pub async fn get_imported_keys(
&mut self,
catalog: Option<impl Into<String> + Send>,
db_schema: Option<impl Into<String> + Send>,
table: String,
) -> Result<FlightRecordBatchStream> {
let msg = CommandGetImportedKeys {
catalog: catalog.map(|s| s.into()),
db_schema: db_schema.map(|s| s.into()),
table,
};
self.do_get_with_cmd(msg.as_any()).await
}
/// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message. /// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message.
/// ///
/// # Parameters /// # Parameters

View File

@ -796,6 +796,7 @@ fn flightsql_permissions(namespace_name: &str, cmd: &FlightSQLCommand) -> Vec<au
FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetExportedKeys(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetExportedKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetImportedKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema,