diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 4710b50b3b..3968da3fb7 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,8 +4,9 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, - CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, + CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, + CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -78,6 +79,10 @@ pub enum FlightSQLCommand { /// Get a list of the available schemas. See [`CommandGetDbSchemas`] /// for details and how to interpret the parameters. CommandGetDbSchemas(CommandGetDbSchemas), + /// Get a description of the foreign key columns that reference the given + /// table's primary key columns (the foreign keys exported by a table) of a table. + /// See [`CommandGetExportedKeys`] for details. + CommandGetExportedKeys(CommandGetExportedKeys), /// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details. CommandGetPrimaryKeys(CommandGetPrimaryKeys), /// Get a list of the available tables @@ -115,6 +120,19 @@ impl Display for FlightSQLCommand { .unwrap_or("") ) } + Self::CommandGetExportedKeys(CommandGetExportedKeys { + catalog, + db_schema, + table, + }) => { + write!( + f, + "CommandGetExportedKeys(catalog={}, db_schema={}, table={})", + catalog.as_ref().map(|c| c.as_str()).unwrap_or(""), + db_schema.as_ref().map(|c| c.as_str()).unwrap_or(""), + table + ) + } Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys { catalog, db_schema, @@ -188,6 +206,8 @@ impl FlightSQLCommand { Ok(Self::CommandGetCatalogs(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetDbSchemas(decoded_cmd)) + } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + Ok(Self::CommandGetExportedKeys(decoded_cmd)) } else if let Some(decode_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetPrimaryKeys(decode_cmd)) } else if let Some(decode_cmd) = Any::unpack::(&msg)? { @@ -227,6 +247,7 @@ impl FlightSQLCommand { FlightSQLCommand::CommandGetSqlInfo(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), + FlightSQLCommand::CommandGetExportedKeys(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd), diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 81104db6e9..6226a8be42 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -11,8 +11,8 @@ use arrow::{ use arrow_flight::{ sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, - CommandGetTableTypes, CommandGetTables, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, + CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandStatementQuery, }, IpcMessage, SchemaAsIpc, }; @@ -67,6 +67,9 @@ impl FlightSQLPlanner { FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => { encode_schema(get_db_schemas_schema().as_ref()) } + FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys { .. }) => { + encode_schema(&GET_EXPORTED_KEYS_SCHEMA) + } FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => { encode_schema(&GET_PRIMARY_KEYS_SCHEMA) } @@ -127,6 +130,20 @@ impl FlightSQLPlanner { let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; Ok(ctx.create_physical_plan(&plan).await?) } + FlightSQLCommand::CommandGetExportedKeys(CommandGetExportedKeys { + catalog, + db_schema, + table, + }) => { + debug!( + ?catalog, + ?db_schema, + ?table, + "Planning GetExportedKeys query" + ); + let plan = plan_get_exported_keys(ctx, catalog, db_schema, table).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { catalog, db_schema, @@ -281,6 +298,16 @@ async fn plan_get_db_schemas( Ok(ctx.batch_to_logical_plan(batch)?) } +async fn plan_get_exported_keys( + ctx: &IOxSessionContext, + _catalog: Option, + _db_schema: Option, + _table: String, +) -> Result { + let batch = RecordBatch::new_empty(Arc::clone(&GET_EXPORTED_KEYS_SCHEMA)); + Ok(ctx.batch_to_logical_plan(batch)?) +} + async fn plan_get_primary_keys( ctx: &IOxSessionContext, _catalog: Option, @@ -333,6 +360,28 @@ static TABLE_TYPES_RECORD_BATCH: Lazy = Lazy::new(|| { RecordBatch::try_new(Arc::clone(&GET_TABLE_TYPE_SCHEMA), vec![table_type]).unwrap() }); +static GET_EXPORTED_KEYS_SCHEMA: Lazy = Lazy::new(|| { + Arc::new(Schema::new(vec![ + Field::new("pk_catalog_name", DataType::Utf8, false), + Field::new("pk_db_schema_name", DataType::Utf8, false), + Field::new("pk_table_name", DataType::Utf8, false), + Field::new("pk_column_name", DataType::Utf8, false), + Field::new("fk_catalog_name", DataType::Utf8, false), + Field::new("fk_db_schema_name", DataType::Utf8, false), + Field::new("fk_table_name", DataType::Utf8, false), + Field::new("fk_column_name", DataType::Utf8, false), + Field::new("key_sequence", DataType::Int32, false), + Field::new("fk_key_name", DataType::Utf8, false), + Field::new("pk_key_name", DataType::Utf8, false), + // According to the definition in https://github.com/apache/arrow/blob/0434ab65075ecd1d2ab9245bcd7ec6038934ed29/format/FlightSql.proto#L1327-L1328 + // update_rule and delete_rule are in type uint1 + // However, Rust DataType does not have this type, + // the closet is DataType::UInt8 + Field::new("update_rule", DataType::UInt8, false), + Field::new("delete_rule", DataType::UInt8, false), + ])) +}); + static GET_PRIMARY_KEYS_SCHEMA: Lazy = Lazy::new(|| { Arc::new(Schema::new(vec![ Field::new("catalog_name", DataType::Utf8, false), diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 7c64a13fdb..b2bf6e8a81 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -938,6 +938,52 @@ async fn flightsql_get_db_schema_matches_information_schema() { .await } +#[tokio::test] +async fn flightsql_get_exported_keys() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut client = flightsql_client(state.cluster()); + let catalog: Option = None; + let db_schema: Option = None; + + let stream = client + .get_exported_keys(catalog, db_schema, table_name.to_string()) + .await + .unwrap(); + let batches = collect_stream(stream).await; + + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - ++ + - ++ + "### + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + #[tokio::test] async fn flightsql_get_primary_keys() { test_helpers::maybe_start_logging(); diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index 32d1e51260..921e4e1798 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -132,6 +132,11 @@ public class Main { System.out.println("**************"); print_result_set(md.getSchemas()); + System.out.println("**************"); + System.out.println("ExportedKeys"); + System.out.println("**************"); + print_result_set(md.getExportedKeys(null, null, "system")); + System.out.println("**************"); System.out.println("PrimaryKeys:"); System.out.println("**************"); diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index be456ca019..de630f15ce 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,8 +29,8 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, - CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetExportedKeys, CommandGetPrimaryKeys, + CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, @@ -182,6 +182,39 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List a description of the foreign key columns that reference the given + /// table's primary key columns (the foreign keys exported by a table) of a + /// table on this server using a [`CommandGetExportedKeys`] message. + /// + /// # Parameters + /// + /// Definition from + /// + /// catalog: Specifies the catalog to search for the foreign key table. + /// An empty string retrieves those without a catalog. + /// If omitted the catalog name should not be used to narrow the search. + /// + /// db_schema: Specifies the schema to search for the foreign key table. + /// An empty string retrieves those without a schema. + /// If omitted the schema name should not be used to narrow the search. + /// + /// table: Specifies the foreign key table to get the foreign keys for. + /// + /// This implementation does not support alternate endpoints + pub async fn get_exported_keys( + &mut self, + catalog: Option + Send>, + db_schema: Option + Send>, + table: String, + ) -> Result { + let msg = CommandGetExportedKeys { + catalog: catalog.map(|s| s.into()), + db_schema: db_schema.map(|s| s.into()), + table, + }; + self.do_get_with_cmd(msg.as_any()).await + } + /// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message. /// /// # Parameters diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 75285d4c7e..96c0afbd28 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -795,6 +795,7 @@ fn flightsql_permissions(namespace_name: &str, cmd: &FlightSQLCommand) -> Vec authz::Action::ReadSchema, FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema, + FlightSQLCommand::CommandGetExportedKeys(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema,