feat: support CommandGetExportedKeys metadata endpoint with tests (#7532)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Chunchun Ye 2023-04-13 08:27:09 -05:00 committed by GitHub
parent 0679cae4c6
commit e86e8de1a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 161 additions and 6 deletions

View File

@ -4,8 +4,9 @@ use std::fmt::Display;
use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
CommandStatementQuery,
};
use bytes::Bytes;
use prost::Message;
@ -78,6 +79,10 @@ pub enum FlightSQLCommand {
/// Get a list of the available schemas. See [`CommandGetDbSchemas`]
/// for details and how to interpret the parameters.
CommandGetDbSchemas(CommandGetDbSchemas),
/// Get a description of the foreign key columns that reference the given
/// table's primary key columns (the foreign keys exported by a table) of a table.
/// See [`CommandGetExportedKeys`] for details.
CommandGetExportedKeys(CommandGetExportedKeys),
/// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details.
CommandGetPrimaryKeys(CommandGetPrimaryKeys),
/// Get a list of the available tables
@ -115,6 +120,19 @@ impl Display for FlightSQLCommand {
.unwrap_or("<NONE>")
)
}
Self::CommandGetExportedKeys(CommandGetExportedKeys {
catalog,
db_schema,
table,
}) => {
write!(
f,
"CommandGetExportedKeys(catalog={}, db_schema={}, table={})",
catalog.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
db_schema.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
table
)
}
Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog,
db_schema,
@ -188,6 +206,8 @@ impl FlightSQLCommand {
Ok(Self::CommandGetCatalogs(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? {
Ok(Self::CommandGetDbSchemas(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetExportedKeys>(&msg)? {
Ok(Self::CommandGetExportedKeys(decoded_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetPrimaryKeys>(&msg)? {
Ok(Self::CommandGetPrimaryKeys(decode_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetTables>(&msg)? {
@ -227,6 +247,7 @@ impl FlightSQLCommand {
FlightSQLCommand::CommandGetSqlInfo(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetExportedKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd),

View File

@ -11,8 +11,8 @@ use arrow::{
use arrow_flight::{
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandStatementQuery,
},
IpcMessage, SchemaAsIpc,
};
@ -67,6 +67,9 @@ impl FlightSQLPlanner {
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => {
encode_schema(get_db_schemas_schema().as_ref())
}
FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys { .. }) => {
encode_schema(&GET_EXPORTED_KEYS_SCHEMA)
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => {
encode_schema(&GET_PRIMARY_KEYS_SCHEMA)
}
@ -127,6 +130,20 @@ impl FlightSQLPlanner {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys {
catalog,
db_schema,
table,
}) => {
debug!(
?catalog,
?db_schema,
?table,
"Planning GetExportedKeys query"
);
let plan = plan_get_exported_keys(ctx, catalog, db_schema, table).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog,
db_schema,
@ -281,6 +298,16 @@ async fn plan_get_db_schemas(
Ok(ctx.batch_to_logical_plan(batch)?)
}
async fn plan_get_exported_keys(
ctx: &IOxSessionContext,
_catalog: Option<String>,
_db_schema: Option<String>,
_table: String,
) -> Result<LogicalPlan> {
let batch = RecordBatch::new_empty(Arc::clone(&GET_EXPORTED_KEYS_SCHEMA));
Ok(ctx.batch_to_logical_plan(batch)?)
}
async fn plan_get_primary_keys(
ctx: &IOxSessionContext,
_catalog: Option<String>,
@ -333,6 +360,28 @@ static TABLE_TYPES_RECORD_BATCH: Lazy<RecordBatch> = Lazy::new(|| {
RecordBatch::try_new(Arc::clone(&GET_TABLE_TYPE_SCHEMA), vec![table_type]).unwrap()
});
static GET_EXPORTED_KEYS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("pk_catalog_name", DataType::Utf8, false),
Field::new("pk_db_schema_name", DataType::Utf8, false),
Field::new("pk_table_name", DataType::Utf8, false),
Field::new("pk_column_name", DataType::Utf8, false),
Field::new("fk_catalog_name", DataType::Utf8, false),
Field::new("fk_db_schema_name", DataType::Utf8, false),
Field::new("fk_table_name", DataType::Utf8, false),
Field::new("fk_column_name", DataType::Utf8, false),
Field::new("key_sequence", DataType::Int32, false),
Field::new("fk_key_name", DataType::Utf8, false),
Field::new("pk_key_name", DataType::Utf8, false),
// According to the definition in https://github.com/apache/arrow/blob/0434ab65075ecd1d2ab9245bcd7ec6038934ed29/format/FlightSql.proto#L1327-L1328
// update_rule and delete_rule are in type uint1
// However, Rust DataType does not have this type,
// the closet is DataType::UInt8
Field::new("update_rule", DataType::UInt8, false),
Field::new("delete_rule", DataType::UInt8, false),
]))
});
static GET_PRIMARY_KEYS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),

View File

@ -938,6 +938,52 @@ async fn flightsql_get_db_schema_matches_information_schema() {
.await
}
#[tokio::test]
async fn flightsql_get_exported_keys() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let catalog: Option<String> = None;
let db_schema: Option<String> = None;
let stream = client
.get_exported_keys(catalog, db_schema, table_name.to_string())
.await
.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- ++
- ++
"###
);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
async fn flightsql_get_primary_keys() {
test_helpers::maybe_start_logging();

View File

@ -132,6 +132,11 @@ public class Main {
System.out.println("**************");
print_result_set(md.getSchemas());
System.out.println("**************");
System.out.println("ExportedKeys");
System.out.println("**************");
print_result_set(md.getExportedKeys(null, null, "system"));
System.out.println("**************");
System.out.println("PrimaryKeys:");
System.out.println("**************");

View File

@ -29,8 +29,8 @@ use arrow_flight::{
error::{FlightError, Result},
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
CommandStatementQuery, ProstMessageExt,
},
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
@ -182,6 +182,39 @@ impl FlightSqlClient {
self.do_get_with_cmd(msg.as_any()).await
}
/// List a description of the foreign key columns that reference the given
/// table's primary key columns (the foreign keys exported by a table) of a
/// table on this server using a [`CommandGetExportedKeys`] message.
///
/// # Parameters
///
/// Definition from <https://github.com/apache/arrow/blob/0434ab65075ecd1d2ab9245bcd7ec6038934ed29/format/FlightSql.proto#L1307-L1352>
///
/// catalog: Specifies the catalog to search for the foreign key table.
/// An empty string retrieves those without a catalog.
/// If omitted the catalog name should not be used to narrow the search.
///
/// db_schema: Specifies the schema to search for the foreign key table.
/// An empty string retrieves those without a schema.
/// If omitted the schema name should not be used to narrow the search.
///
/// table: Specifies the foreign key table to get the foreign keys for.
///
/// This implementation does not support alternate endpoints
pub async fn get_exported_keys(
&mut self,
catalog: Option<impl Into<String> + Send>,
db_schema: Option<impl Into<String> + Send>,
table: String,
) -> Result<FlightRecordBatchStream> {
let msg = CommandGetExportedKeys {
catalog: catalog.map(|s| s.into()),
db_schema: db_schema.map(|s| s.into()),
table,
};
self.do_get_with_cmd(msg.as_any()).await
}
/// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message.
///
/// # Parameters

View File

@ -795,6 +795,7 @@ fn flightsql_permissions(namespace_name: &str, cmd: &FlightSQLCommand) -> Vec<au
FlightSQLCommand::CommandGetSqlInfo(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetExportedKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema,