diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 886e1d16f1..4710b50b3b 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,8 +4,8 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes, - CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, + CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -78,6 +78,8 @@ pub enum FlightSQLCommand { /// Get a list of the available schemas. See [`CommandGetDbSchemas`] /// for details and how to interpret the parameters. CommandGetDbSchemas(CommandGetDbSchemas), + /// Get a list of primary keys. See [`CommandGetPrimaryKeys`] for details. + CommandGetPrimaryKeys(CommandGetPrimaryKeys), /// Get a list of the available tables CommandGetTables(CommandGetTables), /// Get a list of the available table tyypes @@ -113,6 +115,19 @@ impl Display for FlightSQLCommand { .unwrap_or("") ) } + Self::CommandGetPrimaryKeys(CommandGetPrimaryKeys { + catalog, + db_schema, + table, + }) => { + write!( + f, + "CommandGetPrimaryKeys(catalog={}, db_schema={}, table={})", + catalog.as_ref().map(|c| c.as_str()).unwrap_or(""), + db_schema.as_ref().map(|c| c.as_str()).unwrap_or(""), + table + ) + } Self::CommandGetTables(CommandGetTables { catalog, db_schema_filter_pattern, @@ -173,6 +188,8 @@ impl FlightSQLCommand { Ok(Self::CommandGetCatalogs(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetDbSchemas(decoded_cmd)) + } else if let Some(decode_cmd) = Any::unpack::(&msg)? { + Ok(Self::CommandGetPrimaryKeys(decode_cmd)) } else if let Some(decode_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetTables(decode_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { @@ -210,6 +227,7 @@ impl FlightSQLCommand { FlightSQLCommand::CommandGetSqlInfo(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), + FlightSQLCommand::CommandGetPrimaryKeys(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd), diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 7733386ef1..731bad0ac7 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -11,8 +11,8 @@ use arrow::{ use arrow_flight::{ sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes, - CommandGetTables, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, + CommandGetTableTypes, CommandGetTables, CommandStatementQuery, }, IpcMessage, SchemaAsIpc, }; @@ -67,6 +67,9 @@ impl FlightSQLPlanner { FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { .. }) => { encode_schema(get_db_schemas_schema().as_ref()) } + FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { .. }) => { + encode_schema(&GET_PRIMARY_KEYS_SCHEMA) + } FlightSQLCommand::CommandGetTables(CommandGetTables { include_schema, .. }) => { encode_schema(get_tables_schema(include_schema).as_ref()) } @@ -124,6 +127,20 @@ impl FlightSQLPlanner { let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; Ok(ctx.create_physical_plan(&plan).await?) } + FlightSQLCommand::CommandGetPrimaryKeys(CommandGetPrimaryKeys { + catalog, + db_schema, + table, + }) => { + debug!( + ?catalog, + ?db_schema, + ?table, + "Planning GetPrimaryKeys query" + ); + let plan = plan_get_primary_keys(ctx, catalog, db_schema, table).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } FlightSQLCommand::CommandGetTables(CommandGetTables { catalog, db_schema_filter_pattern, @@ -264,6 +281,16 @@ async fn plan_get_db_schemas( Ok(ctx.batch_to_logical_plan(batch)?) } +async fn plan_get_primary_keys( + ctx: &IOxSessionContext, + _catalog: Option, + _db_schema: Option, + _table: String, +) -> Result { + let batch = RecordBatch::new_empty(Arc::new(Schema::new(vec![]))); + Ok(ctx.batch_to_logical_plan(batch)?) +} + async fn plan_get_tables( ctx: &IOxSessionContext, catalog: Option, @@ -305,3 +332,14 @@ static TABLE_TYPES_RECORD_BATCH: Lazy = Lazy::new(|| { let table_type = Arc::new(StringArray::from_iter_values(["BASE TABLE", "VIEW"])) as ArrayRef; RecordBatch::try_new(Arc::clone(&GET_TABLE_TYPE_SCHEMA), vec![table_type]).unwrap() }); + +static GET_PRIMARY_KEYS_SCHEMA: Lazy = Lazy::new(|| { + Arc::new(Schema::new(vec![ + Field::new("catalog_name", DataType::Utf8, false), + Field::new("db_schema_name", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("key_name", DataType::Utf8, false), + Field::new("key_sequence", DataType::Int32, false), + ])) +}); diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 6c35d122e1..8db60cf47b 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -938,6 +938,52 @@ async fn flightsql_get_db_schema_matches_information_schema() { .await } +#[tokio::test] +async fn flightsql_get_primary_keys() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut client = flightsql_client(state.cluster()); + let catalog: Option = None; + let db_schema: Option = None; + + let stream = client + .get_primary_keys(catalog, db_schema, table_name.to_string()) + .await + .unwrap(); + let batches = collect_stream(stream).await; + + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - ++ + - ++ + "### + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + #[tokio::test] /// Runs the `jdbc_client` program against IOx to verify JDBC via FlightSQL is working /// diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index d0db6dd627..32d1e51260 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -132,6 +132,11 @@ public class Main { System.out.println("**************"); print_result_set(md.getSchemas()); + System.out.println("**************"); + System.out.println("PrimaryKeys:"); + System.out.println("**************"); + print_result_set(md.getPrimaryKeys(null, null, "system")); + System.out.println("**************"); System.out.println("Tables:"); System.out.println("**************"); diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 717c05872b..be456ca019 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,8 +29,9 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes, - CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetPrimaryKeys, CommandGetSqlInfo, + CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, + CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, }; @@ -181,6 +182,37 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the primary keys on this server using a [`CommandGetPrimaryKeys`] message. + /// + /// # Parameters + /// + /// Definition from + /// + /// catalog: Specifies the catalog to search for the table. + /// An empty string retrieves those without a catalog. + /// If omitted the catalog name should not be used to narrow the search. + /// + /// db_schema: Specifies the schema to search for the table. + /// An empty string retrieves those without a schema. + /// If omitted the schema name should not be used to narrow the search. + /// + /// table: Specifies the table to get the primary keys for. + /// + /// This implementation does not support alternate endpoints + pub async fn get_primary_keys( + &mut self, + catalog: Option + Send>, + db_schema: Option + Send>, + table: String, + ) -> Result { + let msg = CommandGetPrimaryKeys { + catalog: catalog.map(|s| s.into()), + db_schema: db_schema.map(|s| s.into()), + table, + }; + self.do_get_with_cmd(msg.as_any()).await + } + /// List the tables on this server using a [`CommandGetTables`] message. /// /// This implementation does not support alternate endpoints diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 8eb0a783b6..737df81e4c 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -740,6 +740,7 @@ fn flightsql_permissions(namespace_name: &str, cmd: &FlightSQLCommand) -> Vec authz::Action::ReadSchema, FlightSQLCommand::CommandGetCatalogs(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetDbSchemas(_) => authz::Action::ReadSchema, + FlightSQLCommand::CommandGetPrimaryKeys(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTables(_) => authz::Action::ReadSchema, FlightSQLCommand::CommandGetTableTypes(_) => authz::Action::ReadSchema, FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => authz::Action::Read,