From 795a79a7adaf5aa8ea4d2026179eb0deb593736f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Mar 2023 20:20:13 +0100 Subject: [PATCH] feat(flightsql): Add `GetTables` metadata endpoint without filters (#7238) * test: Add getTables jdbc_client example * feat: add `CommandGetTables` in FlightSqlClient * feat: add `CommandGetTables` in flightsql cmd and planner * test: add e2e test for `CommandGetTables` * chore: clippy * chore: comment out the test with filters * test: update jdbc test expected value for tables --------- Co-authored-by: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- flightsql/src/cmd.rs | 33 +++++++++- flightsql/src/planner.rs | 57 +++++++++++++++- .../tests/end_to_end_cases/flightsql.rs | 65 +++++++++++++++++++ influxdb_iox/tests/jdbc_client/Main.java | 11 ++++ influxdb_iox_client/src/client/flightsql.rs | 25 ++++++- 5 files changed, 187 insertions(+), 4 deletions(-) diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 40ea6404ae..da20d7e2f7 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,8 +4,8 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandPreparedStatementQuery, - CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables, + CommandPreparedStatementQuery, CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -75,6 +75,8 @@ pub enum FlightSQLCommand { /// Get a list of the available schemas. See [`CommandGetDbSchemas`] /// for details and how to interpret the parameters. CommandGetDbSchemas(CommandGetDbSchemas), + /// Get a list of the available tables + CommandGetTables(CommandGetTables), /// Get a list of the available table tyypes CommandGetTableTypes(CommandGetTableTypes), /// Create a prepared statement @@ -105,6 +107,30 @@ impl Display for FlightSQLCommand { .unwrap_or("") ) } + Self::CommandGetTables(CommandGetTables { + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema, + }) => { + write!( + f, + "CommandGetTables(catalog={}, db_schema_filter_pattern={},\ + table_name_filter_pattern={},table_types={},include_schema={})", + catalog.as_ref().map(|c| c.as_ref()).unwrap_or(""), + db_schema_filter_pattern + .as_ref() + .map(|db| db.as_ref()) + .unwrap_or(""), + table_name_filter_pattern + .as_ref() + .map(|t| t.as_ref()) + .unwrap_or(""), + table_types.join(","), + include_schema, + ) + } Self::CommandGetTableTypes(CommandGetTableTypes {}) => { write!(f, "CommandGetTableTypes") } @@ -139,6 +165,8 @@ impl FlightSQLCommand { Ok(Self::CommandGetCatalogs(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetDbSchemas(decoded_cmd)) + } else if let Some(decode_cmd) = Any::unpack::(&msg)? { + Ok(Self::CommandGetTables(decode_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetTableTypes(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? @@ -173,6 +201,7 @@ impl FlightSQLCommand { } FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), + FlightSQLCommand::CommandGetTables(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => { diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index d34eee8530..df10c3cbb6 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -5,7 +5,8 @@ use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::{ sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables, + CommandStatementQuery, }, IpcMessage, SchemaAsIpc, }; @@ -58,6 +59,24 @@ impl FlightSQLPlanner { // schema instead of recomputing it each time. get_schema_for_plan(plan) } + FlightSQLCommand::CommandGetTables(CommandGetTables { + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema, + }) => { + let plan = plan_get_tables( + ctx, + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema, + ) + .await?; + get_schema_for_plan(plan) + } FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => { let plan = plan_get_table_types(ctx).await?; // As an optimization, we could hard code the result @@ -110,6 +129,25 @@ impl FlightSQLPlanner { let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; Ok(ctx.create_physical_plan(&plan).await?) } + FlightSQLCommand::CommandGetTables(CommandGetTables { + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema, + }) => { + debug!("Planning GetTables query"); + let plan = plan_get_tables( + ctx, + catalog, + db_schema_filter_pattern, + table_name_filter_pattern, + table_types, + include_schema, + ) + .await?; + Ok(ctx.create_physical_plan(&plan).await?) + } FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => { debug!("Planning GetTableTypes query"); let plan = plan_get_table_types(ctx).await?; @@ -253,6 +291,23 @@ async fn plan_get_db_schemas( Ok(plan.with_param_values(params)?) } +/// Return a `LogicalPlan` for GetTables +async fn plan_get_tables( + ctx: &IOxSessionContext, + _catalog: Option, + _db_schema_filter_pattern: Option, + _table_name_filter_pattern: Option, + _table_types: Vec, + _include_schema: bool, +) -> Result { + let query = "SELECT table_catalog AS catalog_name, \ + table_schema AS db_schema_name, table_name, table_type \ + FROM information_schema.tables \ + ORDER BY table_catalog, table_schema, table_name"; + + Ok(ctx.sql_to_logical_plan(query).await?) +} + /// Return a `LogicalPlan` for GetTableTypes /// /// In the future this could be made more efficient by building the diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 69479e5666..8d61bc65db 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -184,6 +184,58 @@ async fn flightsql_get_catalogs() { .await } +#[tokio::test] +async fn flightsql_get_tables() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut client = flightsql_client(state.cluster()); + + let stream = client + .get_tables(Some(""), Some(""), Some(""), None) + .await + .unwrap(); + let batches = collect_stream(stream).await; + + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - +--------------+--------------------+-------------+------------+ + - "| catalog_name | db_schema_name | table_name | table_type |" + - +--------------+--------------------+-------------+------------+ + - "| public | information_schema | columns | VIEW |" + - "| public | information_schema | df_settings | VIEW |" + - "| public | information_schema | tables | VIEW |" + - "| public | information_schema | views | VIEW |" + - "| public | iox | the_table | BASE TABLE |" + - "| public | system | queries | BASE TABLE |" + - +--------------+--------------------+-------------+------------+ + "### + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + #[tokio::test] async fn flightsql_get_table_types() { test_helpers::maybe_start_logging(); @@ -448,6 +500,18 @@ async fn flightsql_jdbc() { information_schema, public\n\ iox, public\n\ system, public"; + // CommandGetTables output + let expected_tables = "**************\n\ + Tables:\n\ + **************\n\ + TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS, TYPE_CAT, TYPE_SCHEM, TYPE_NAME, SELF_REFERENCING_COL_NAME, REF_GENERATION\n\ + ------------\n\ + public, information_schema, columns, VIEW, null, null, null, null, null, null\n\ + public, information_schema, df_settings, VIEW, null, null, null, null, null, null\n\ + public, information_schema, tables, VIEW, null, null, null, null, null, null\n\ + public, information_schema, views, VIEW, null, null, null, null, null, null\n\ + public, iox, the_table, BASE TABLE, null, null, null, null, null, null\n\ + public, system, queries, BASE TABLE, null, null, null, null, null, null"; // CommandGetTableTypes output let expected_table_types = "**************\n\ @@ -466,6 +530,7 @@ async fn flightsql_jdbc() { .success() .stdout(predicate::str::contains(expected_catalogs)) .stdout(predicate::str::contains(expected_schemas)) + .stdout(predicate::str::contains(expected_tables)) .stdout(predicate::str::contains(expected_table_types)); } .boxed() diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index 39dd73e292..6270d369ce 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -132,6 +132,17 @@ public class Main { System.out.println("**************"); print_result_set(md.getSchemas()); + System.out.println("**************"); + System.out.println("Tables:"); + System.out.println("**************"); + // null means no filtering + print_result_set(md.getTables(null, null, null, null)); + + // System.out.println("**************"); + // System.out.println("Tables (system table filter):"); + // System.out.println("**************"); + // print_result_set(md.getTables("public", "system", null, null)); + System.out.println("**************"); System.out.println("Table Types:"); System.out.println("**************"); diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 7e83a39739..ff436cdc48 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,7 +29,7 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandGetTables, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, @@ -164,6 +164,29 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the tables on this server using a [`CommandGetTables`] message. + /// + /// This implementation does not support alternate endpoints + /// + /// [`CommandGetTables`]: https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1176-L1241 + pub async fn get_tables( + &mut self, + _catalog: Option + Send>, + _db_schema_filter_pattern: Option + Send>, + _table_name_filter_pattern: Option + Send>, + _table_types: Option>, + ) -> Result { + let msg = CommandGetTables { + catalog: None, + db_schema_filter_pattern: None, + table_name_filter_pattern: None, + table_types: vec![], + // TODO: implement include_schema after optional query parameters are done + include_schema: false, + }; + self.do_get_with_cmd(msg.as_any()).await + } + /// List the table types on this server using a [`CommandGetTableTypes`] message. /// /// This implementation does not support alternate endpoints