refactor(flightsql): Build `GetTableTypes` response directly from the catalog (#7324)

* refactor(flightsql): Do not use plan for GetTableTypes

* test: Add an end to end test
pull/24376/head
Andrew Lamb 2023-03-24 16:20:28 +01:00 committed by GitHub
parent 18609381f0
commit ce77d3bd74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 12 deletions

View File

@ -2,7 +2,7 @@
use std::sync::Arc;
use arrow::{
array::{as_string_array, ArrayRef, BinaryArray, GenericBinaryBuilder},
array::{as_string_array, ArrayRef, BinaryArray, GenericBinaryBuilder, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
error::ArrowError,
ipc::writer::IpcWriteOptions,
@ -558,16 +558,22 @@ static GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
});
/// Return a `LogicalPlan` for GetTableTypes
///
/// In the future this could be made more efficient by building the
/// response directly from the IOx catalog rather than running an
/// entire DataFusion plan.
async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type";
Ok(ctx.sql_to_logical_plan(query).await?)
Ok(ctx.batch_to_logical_plan(TABLE_TYPES_RECORD_BATCH.clone())?)
}
/// The schema for GetTableTypes
static GET_TABLE_TYPE_SCHEMA: Lazy<Schema> =
Lazy::new(|| Schema::new(vec![Field::new("table_type", DataType::Utf8, false)]));
static GET_TABLE_TYPE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![Field::new(
"table_type",
DataType::Utf8,
false,
)]))
});
static TABLE_TYPES_RECORD_BATCH: Lazy<RecordBatch> = Lazy::new(|| {
// https://github.com/apache/arrow-datafusion/blob/26b8377b0690916deacf401097d688699026b8fb/datafusion/core/src/catalog/information_schema.rs#L285-L287
// IOx doesn't support LOCAL TEMPORARY yet
let table_type = Arc::new(StringArray::from_iter_values(["BASE TABLE", "VIEW"])) as ArrayRef;
RecordBatch::try_new(Arc::clone(&GET_TABLE_TYPE_SCHEMA), vec![table_type]).unwrap()
});

View File

@ -488,6 +488,70 @@ async fn flightsql_get_table_types() {
.await
}
#[tokio::test]
async fn flightsql_get_table_types_matches_information_schema() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster());
// output of get_table_types is built manually in
// IOx, so it is important it remains in sync with
// the actual contents of the information schema
fn no_filter() -> Option<String> {
None
}
let stream = client
.get_table_types()
.await
.unwrap();
let get_table_types_batches = collect_stream(stream).await;
let get_table_types_output = batches_to_sorted_lines(&get_table_types_batches);
let sql = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type";
let stream = client.query(sql).await.unwrap();
let information_schema_batches = collect_stream(stream).await;
let information_schema_output =
batches_to_sorted_lines(&information_schema_batches);
insta::assert_yaml_snapshot!(
get_table_types_output,
@r###"
---
- +------------+
- "| table_type |"
- +------------+
- "| BASE TABLE |"
- "| VIEW |"
- +------------+
"###
);
assert_eq!(get_table_types_output, information_schema_output);
}
.boxed()
})),
],
)
.run()
.await
}
#[tokio::test]
async fn flightsql_get_db_schemas() {
test_helpers::maybe_start_logging();

View File

@ -120,8 +120,13 @@ impl FlightSqlClient {
/// Step 2: Fetch the results described in the [`FlightInfo`]
///
/// This implementation does not support alternate endpoints
pub async fn query(&mut self, query: String) -> Result<FlightRecordBatchStream> {
let msg = CommandStatementQuery { query };
pub async fn query(
&mut self,
query: impl Into<String> + Send,
) -> Result<FlightRecordBatchStream> {
let msg = CommandStatementQuery {
query: query.into(),
};
self.do_get_with_cmd(msg.as_any()).await
}