feat(flightsql): Support `GetPrimaryKeys` metadata endpoint with an empty RecordBatch (#7496)

* feat: add CommandGetPrimaryKeys metadata endpoint and tests

* chore: update schema for the returned record batch

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Chunchun Ye 2023-04-11 09:13:44 -05:00 committed by GitHub
parent b29bdf73ab
commit b131895fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 146 additions and 6 deletions

View File

@ -4,8 +4,8 @@ use std::fmt::Display;
use arrow_flight::sql::{
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery,
};
use bytes::Bytes;
use prost::Message;
@ -78,6 +78,8 @@ pub enum FlightSQLCommand {
/// Get a list of the available schemas. See [`CommandGetDbSchemas`]
/// for details and how to interpret the parameters.
CommandGetDbSchemas(CommandGetDbSchemas),
/// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details.
CommandGetPrimaryKeys(CommandGetPrimaryKeys),
/// Get a list of the available tables
CommandGetTables(CommandGetTables),
/// Get a list of the available table tyypes
@ -113,6 +115,19 @@ impl Display for FlightSQLCommand {
.unwrap_or("<NONE>")
)
}
Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog,
db_schema,
table,
}) => {
write!(
f,
"CommandGetPrimaryKeys(catalog={}, db_schema={}, table={})",
catalog.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
db_schema.as_ref().map(|c| c.as_str()).unwrap_or("<NONE>"),
table
)
}
Self::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
@ -173,6 +188,8 @@ impl FlightSQLCommand {
Ok(Self::CommandGetCatalogs(decoded_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetDbSchemas>(&msg)? {
Ok(Self::CommandGetDbSchemas(decoded_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetPrimaryKeys>(&msg)? {
Ok(Self::CommandGetPrimaryKeys(decode_cmd))
} else if let Some(decode_cmd) = Any::unpack::<CommandGetTables>(&msg)? {
Ok(Self::CommandGetTables(decode_cmd))
} else if let Some(decoded_cmd) = Any::unpack::<CommandGetTableTypes>(&msg)? {
@ -210,6 +227,7 @@ impl FlightSQLCommand {
FlightSQLCommand::CommandGetSqlInfo(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd),
FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd),
FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd),

View File

@ -11,8 +11,8 @@ use arrow::{
use arrow_flight::{
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, CommandStatementQuery,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandStatementQuery,
},
IpcMessage, SchemaAsIpc,
};
@ -67,6 +67,9 @@ impl FlightSQLPlanner {
FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => {
encode_schema(get_db_schemas_schema().as_ref())
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => {
encode_schema(&GET_PRIMARY_KEYS_SCHEMA)
}
FlightSQLCommand::CommandGetTables(CommandGetTables { include_schema, .. }) => {
encode_schema(get_tables_schema(include_schema).as_ref())
}
@ -124,6 +127,20 @@ impl FlightSQLPlanner {
let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys {
catalog,
db_schema,
table,
}) => {
debug!(
?catalog,
?db_schema,
?table,
"Planning GetPrimaryKeys query"
);
let plan = plan_get_primary_keys(ctx, catalog, db_schema, table).await?;
Ok(ctx.create_physical_plan(&plan).await?)
}
FlightSQLCommand::CommandGetTables(CommandGetTables {
catalog,
db_schema_filter_pattern,
@ -264,6 +281,16 @@ async fn plan_get_db_schemas(
Ok(ctx.batch_to_logical_plan(batch)?)
}
async fn plan_get_primary_keys(
ctx: &IOxSessionContext,
_catalog: Option<String>,
_db_schema: Option<String>,
_table: String,
) -> Result<LogicalPlan> {
let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![])));
Ok(ctx.batch_to_logical_plan(batch)?)
}
async fn plan_get_tables(
ctx: &IOxSessionContext,
catalog: Option<String>,
@ -305,3 +332,14 @@ static TABLE_TYPES_RECORD_BATCH: Lazy<RecordBatch> = Lazy::new(|| {
let table_type = Arc::new(StringArray::from_iter_values(["BASE TABLE", "VIEW"])) as ArrayRef;
RecordBatch::try_new(Arc::clone(&GET_TABLE_TYPE_SCHEMA), vec![table_type]).unwrap()
});
static GET_PRIMARY_KEYS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("db_schema_name", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("key_name", DataType::Utf8, false),
Field::new("key_sequence", DataType::Int32, false),
]))
});

View File

@ -938,6 +938,52 @@ async fn flightsql_get_db_schema_matches_information_schema() {
.await
}
#[tokio::test]
async fn flightsql_get_primary_keys() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
let catalog: Option<String> = None;
let db_schema: Option<String> = None;
let stream = client
.get_primary_keys(catalog, db_schema, table_name.to_string())
.await
.unwrap();
let batches = collect_stream(stream).await;
insta::assert_yaml_snapshot!(
batches_to_sorted_lines(&batches),
@r###"
---
- ++
- ++
"###
);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
/// Runs the `jdbc_client` program against IOx to verify JDBC via FlightSQL is working
///

View File

@ -132,6 +132,11 @@ public class Main {
System.out.println("**************");
print_result_set(md.getSchemas());
System.out.println("**************");
System.out.println("PrimaryKeys:");
System.out.println("**************");
print_result_set(md.getPrimaryKeys(null, null, "system"));
System.out.println("**************");
System.out.println("Tables:");
System.out.println("**************");

View File

@ -29,8 +29,9 @@ use arrow_flight::{
error::{FlightError, Result},
sql::{
ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt,
CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo,
CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery,
CommandStatementQuery, ProstMessageExt,
},
Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket,
};
@ -181,6 +182,37 @@ impl FlightSqlClient {
self.do_get_with_cmd(msg.as_any()).await
}
/// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message.
///
/// # Parameters
///
/// Definition from <https://github.com/apache/arrow/blob/2fe17338e2d1f85d0c2685d31d2dd51f138b6b80/format/FlightSql.proto#L1261-L1297>
///
/// catalog: Specifies the catalog to search for the table.
/// An empty string retrieves those without a catalog.
/// If omitted the catalog name should not be used to narrow the search.
///
/// db_schema: Specifies the schema to search for the table.
/// An empty string retrieves those without a schema.
/// If omitted the schema name should not be used to narrow the search.
///
/// table: Specifies the table to get the primary keys for.
///
/// This implementation does not support alternate endpoints
pub async fn get_primary_keys(
&mut self,
catalog: Option<impl Into<String> + Send>,
db_schema: Option<impl Into<String> + Send>,
table: String,
) -> Result<FlightRecordBatchStream> {
let msg = CommandGetPrimaryKeys {
catalog: catalog.map(|s| s.into()),
db_schema: db_schema.map(|s| s.into()),
table,
};
self.do_get_with_cmd(msg.as_any()).await
}
/// List the tables on this server using a [`CommandGetTables`] message.
///
/// This implementation does not support alternate endpoints

View File

@ -740,6 +740,7 @@ fn flightsql_permissions(namespace_name: &str, cmd: &FlightSQLCommand) -> Vec<au
FlightSQLCommand::CommandGetSqlInfo(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema,
FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema,
FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => authz::Action::Read,