diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index c7d669c3be..b584c6763a 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -1,13 +1,21 @@ use std::path::PathBuf; use arrow::record_batch::RecordBatch; -use arrow_flight::{decode::FlightRecordBatchStream, sql::SqlInfo}; +use arrow_flight::{ + decode::FlightRecordBatchStream, + sql::{ + Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes, + CommandGetTables, ProstMessageExt, SqlInfo, + }, + FlightClient, FlightDescriptor, +}; use arrow_util::test_util::batches_to_sorted_lines; use assert_cmd::Command; use datafusion::common::assert_contains; use futures::{FutureExt, TryStreamExt}; use influxdb_iox_client::flightsql::FlightSqlClient; use predicates::prelude::*; +use prost::Message; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState}; #[tokio::test] @@ -625,6 +633,101 @@ async fn flightsql_jdbc() { .await } +/// test ensures that the schema returned as part of GetFlightInfo matches that of the +/// actual response. +#[tokio::test] +async fn flightsql_schema_matches() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared2(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!( + "{table_name},tag1=A,tag2=B val=42i 123456\n\ + {table_name},tag1=A,tag2=C val=43i 123457" + )), + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut client = flightsql_client(state.cluster()).into_inner(); + + // Verify schema for each type of command + let cases = vec![ + // CommandStatementQuery fails because of + // https://github.com/influxdata/influxdb_iox/issues/7279> + // CommandStatementQuery { + // query: format!("select * from {table_name}"), + // } + // .as_any(), + CommandGetSqlInfo { info: vec![] }.as_any(), + CommandGetCatalogs {}.as_any(), + CommandGetDbSchemas { + catalog: None, + db_schema_filter_pattern: None, + } + .as_any(), + CommandGetTables { + catalog: None, + db_schema_filter_pattern: None, + table_name_filter_pattern: None, + table_types: vec![], + include_schema: false, + } + .as_any(), + CommandGetTableTypes {}.as_any(), + ]; + + for cmd in cases { + assert_schema(&mut client, cmd).await; + } + } + .boxed() + })), + ], + ) + .run() + .await +} + +/// Verifies that the schema returned by `GetFlightInfo` and `DoGet` +/// match for `cmd`. +async fn assert_schema(client: &mut FlightClient, cmd: Any) { + println!("Checking schema for message type {}", cmd.type_url); + + let descriptor = FlightDescriptor::new_cmd(cmd.encode_to_vec()); + let flight_info = client.get_flight_info(descriptor).await.unwrap(); + + assert_eq!(flight_info.endpoint.len(), 1); + let ticket = flight_info.endpoint[0] + .ticket + .as_ref() + .expect("Need ticket") + .clone(); + + // Schema reported by `GetFlightInfo` + let flight_info_schema = flight_info.try_decode_schema().unwrap(); + + // Get results and ensure they match the schema reported by GetFlightInfo + let mut result_stream = client.do_get(ticket).await.unwrap(); + let mut saw_data = false; + while let Some(batch) = result_stream.try_next().await.unwrap() { + saw_data = true; + assert_eq!(batch.schema().as_ref(), &flight_info_schema); + // The stream itself also may report a schema + if let Some(stream_schema) = result_stream.schema() { + assert_eq!(stream_schema.as_ref(), &flight_info_schema); + } + } + // verify we have seen at least one RecordBatch + // (all FlightSQL endpoints return at least one) + assert!(saw_data); +} + /// Return a [`FlightSqlClient`] configured for use fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient { let connection = cluster.querier().querier_grpc_connection();