diff --git a/flightsql/src/cmd.rs b/flightsql/src/cmd.rs index 283799587a..40ea6404ae 100644 --- a/flightsql/src/cmd.rs +++ b/flightsql/src/cmd.rs @@ -4,7 +4,8 @@ use std::fmt::Display; use arrow_flight::sql::{ ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandPreparedStatementQuery, + CommandStatementQuery, }; use bytes::Bytes; use prost::Message; @@ -74,6 +75,8 @@ pub enum FlightSQLCommand { /// Get a list of the available schemas. See [`CommandGetDbSchemas`] /// for details and how to interpret the parameters. CommandGetDbSchemas(CommandGetDbSchemas), + /// Get a list of the available table tyypes + CommandGetTableTypes(CommandGetTableTypes), /// Create a prepared statement ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest), /// Close a prepared statement @@ -102,6 +105,9 @@ impl Display for FlightSQLCommand { .unwrap_or("") ) } + Self::CommandGetTableTypes(CommandGetTableTypes {}) => { + write!(f, "CommandGetTableTypes") + } Self::ActionCreatePreparedStatementRequest(ActionCreatePreparedStatementRequest { query, }) => { @@ -133,6 +139,8 @@ impl FlightSQLCommand { Ok(Self::CommandGetCatalogs(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::CommandGetDbSchemas(decoded_cmd)) + } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { + Ok(Self::CommandGetTableTypes(decoded_cmd)) } else if let Some(decoded_cmd) = Any::unpack::(&msg)? { Ok(Self::ActionCreatePreparedStatementRequest(decoded_cmd)) @@ -165,6 +173,7 @@ impl FlightSQLCommand { } FlightSQLCommand::CommandGetCatalogs(cmd) => Any::pack(&cmd), FlightSQLCommand::CommandGetDbSchemas(cmd) => Any::pack(&cmd), + FlightSQLCommand::CommandGetTableTypes(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionCreatePreparedStatementRequest(cmd) => Any::pack(&cmd), FlightSQLCommand::ActionClosePreparedStatementRequest(handle) => { let prepared_statement_handle = handle.encode(); diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 75a23e3514..d34eee8530 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -5,7 +5,7 @@ use arrow::{datatypes::Schema, error::ArrowError, ipc::writer::IpcWriteOptions}; use arrow_flight::{ sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandStatementQuery, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, CommandStatementQuery, }, IpcMessage, SchemaAsIpc, }; @@ -45,6 +45,8 @@ impl FlightSQLPlanner { } FlightSQLCommand::CommandGetCatalogs(CommandGetCatalogs {}) => { let plan = plan_get_catalogs(ctx).await?; + // As an optimization, we could hard code the result + // schema instead of recomputing it each time. get_schema_for_plan(plan) } FlightSQLCommand::CommandGetDbSchemas(CommandGetDbSchemas { @@ -52,6 +54,14 @@ impl FlightSQLPlanner { db_schema_filter_pattern, }) => { let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; + // As an optimization, we could hard code the result + // schema instead of recomputing it each time. + get_schema_for_plan(plan) + } + FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => { + let plan = plan_get_table_types(ctx).await?; + // As an optimization, we could hard code the result + // schema instead of recomputing it each time. get_schema_for_plan(plan) } FlightSQLCommand::ActionCreatePreparedStatementRequest(_) @@ -100,6 +110,11 @@ impl FlightSQLPlanner { let plan = plan_get_db_schemas(ctx, catalog, db_schema_filter_pattern).await?; Ok(ctx.create_physical_plan(&plan).await?) } + FlightSQLCommand::CommandGetTableTypes(CommandGetTableTypes {}) => { + debug!("Planning GetTableTypes query"); + let plan = plan_get_table_types(ctx).await?; + Ok(ctx.create_physical_plan(&plan).await?) + } FlightSQLCommand::ActionClosePreparedStatementRequest(_) | FlightSQLCommand::ActionCreatePreparedStatementRequest(_) => ProtocolSnafu { cmd: format!("{cmd:?}"), @@ -237,3 +252,13 @@ async fn plan_get_db_schemas( debug!(?plan, "Prepared plan is"); Ok(plan.with_param_values(params)?) } + +/// Return a `LogicalPlan` for GetTableTypes +/// +/// In the future this could be made more efficient by building the +/// response directly from the IOx catalog rather than running an +/// entire DataFusion plan. +async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result { + let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type"; + Ok(ctx.sql_to_logical_plan(query).await?) +} diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index 31c1d6f62b..69479e5666 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -184,6 +184,51 @@ async fn flightsql_get_catalogs() { .await } +#[tokio::test] +async fn flightsql_get_table_types() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut client = flightsql_client(state.cluster()); + + let stream = client.get_table_types().await.unwrap(); + let batches = collect_stream(stream).await; + + insta::assert_yaml_snapshot!( + batches_to_sorted_lines(&batches), + @r###" + --- + - +------------+ + - "| table_type |" + - +------------+ + - "| BASE TABLE |" + - "| VIEW |" + - +------------+ + "### + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + #[tokio::test] async fn flightsql_get_db_schemas() { test_helpers::maybe_start_logging(); @@ -394,6 +439,7 @@ async fn flightsql_jdbc() { ------------\n\ public"; + // CommandGetSchemas output let expected_schemas = "**************\n\ Schemas:\n\ **************\n\ @@ -403,6 +449,15 @@ async fn flightsql_jdbc() { iox, public\n\ system, public"; + // CommandGetTableTypes output + let expected_table_types = "**************\n\ + Table Types:\n\ + **************\n\ + TABLE_TYPE\n\ + ------------\n\ + BASE TABLE\n\ + VIEW"; + // Validate metadata: jdbc_client metadata Command::from_std(std::process::Command::new(&path)) .arg(&jdbc_url) @@ -410,7 +465,8 @@ async fn flightsql_jdbc() { .assert() .success() .stdout(predicate::str::contains(expected_catalogs)) - .stdout(predicate::str::contains(expected_schemas)); + .stdout(predicate::str::contains(expected_schemas)) + .stdout(predicate::str::contains(expected_table_types)); } .boxed() })), diff --git a/influxdb_iox/tests/jdbc_client/Main.java b/influxdb_iox/tests/jdbc_client/Main.java index f3c9282b94..39dd73e292 100644 --- a/influxdb_iox/tests/jdbc_client/Main.java +++ b/influxdb_iox/tests/jdbc_client/Main.java @@ -132,6 +132,11 @@ public class Main { System.out.println("**************"); print_result_set(md.getSchemas()); + System.out.println("**************"); + System.out.println("Table Types:"); + System.out.println("**************"); + print_result_set(md.getTableTypes()); + //System.out.println("isReadOnly: " + md.isReadOnly()); //System.out.println("getSearchStringEscape: " + md.getSearchStringEscape()); //System.out.println("getDriverVersion: " + md.getDriverVersion()); diff --git a/influxdb_iox_client/src/client/flightsql.rs b/influxdb_iox_client/src/client/flightsql.rs index 90c4809076..7e83a39739 100644 --- a/influxdb_iox_client/src/client/flightsql.rs +++ b/influxdb_iox_client/src/client/flightsql.rs @@ -29,8 +29,8 @@ use arrow_flight::{ error::{FlightError, Result}, sql::{ ActionCreatePreparedStatementRequest, ActionCreatePreparedStatementResult, Any, - CommandGetCatalogs, CommandGetDbSchemas, CommandPreparedStatementQuery, - CommandStatementQuery, ProstMessageExt, + CommandGetCatalogs, CommandGetDbSchemas, CommandGetTableTypes, + CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt, }, Action, FlightClient, FlightDescriptor, FlightInfo, IpcMessage, Ticket, }; @@ -164,6 +164,16 @@ impl FlightSqlClient { self.do_get_with_cmd(msg.as_any()).await } + /// List the table types on this server using a [`CommandGetTableTypes`] message. + /// + /// This implementation does not support alternate endpoints + /// + /// [`CommandGetTableTypes`]: https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1243-L1259 + pub async fn get_table_types(&mut self) -> Result { + let msg = CommandGetTableTypes {}; + self.do_get_with_cmd(msg.as_any()).await + } + /// Implements the canonical interaction for most FlightSQL messages: /// /// 1. Call `GetFlightInfo` with the provided message, and get a