chore(flightsql): Add tests that schema and data matches (#7280)

* fix(flightsql): Tests for matching schema in FligthSQL

* fix: Update influxdb_iox/tests/end_to_end_cases/flightsql.rs
pull/24376/head
Andrew Lamb 2023-03-21 16:40:21 +01:00 committed by GitHub
parent a385bb2ab2
commit 779234eb20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 104 additions and 1 deletions

View File

@ -1,13 +1,21 @@
use std::path::PathBuf;
use arrow::record_batch::RecordBatch;
use arrow_flight::{decode::FlightRecordBatchStream, sql::SqlInfo};
use arrow_flight::{
decode::FlightRecordBatchStream,
sql::{
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, ProstMessageExt, SqlInfo,
},
FlightClient, FlightDescriptor,
};
use arrow_util::test_util::batches_to_sorted_lines;
use assert_cmd::Command;
use datafusion::common::assert_contains;
use futures::{FutureExt, TryStreamExt};
use influxdb_iox_client::flightsql::FlightSqlClient;
use predicates::prelude::*;
use prost::Message;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
#[tokio::test]
@ -625,6 +633,101 @@ async fn flightsql_jdbc() {
.await
}
/// test ensures that the schema returned as part of GetFlightInfo matches that of the
/// actual response.
#[tokio::test]
async fn flightsql_schema_matches() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{table_name},tag1=A,tag2=B val=42i 123456\n\
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut client = flightsql_client(state.cluster()).into_inner();
// Verify schema for each type of command
let cases = vec![
// CommandStatementQuery fails because of
// https://github.com/influxdata/influxdb_iox/issues/7279>
// CommandStatementQuery {
// query: format!("select * from {table_name}"),
// }
// .as_any(),
CommandGetSqlInfo { info: vec![] }.as_any(),
CommandGetCatalogs {}.as_any(),
CommandGetDbSchemas {
catalog: None,
db_schema_filter_pattern: None,
}
.as_any(),
CommandGetTables {
catalog: None,
db_schema_filter_pattern: None,
table_name_filter_pattern: None,
table_types: vec![],
include_schema: false,
}
.as_any(),
CommandGetTableTypes {}.as_any(),
];
for cmd in cases {
assert_schema(&mut client, cmd).await;
}
}
.boxed()
})),
],
)
.run()
.await
}
/// Verifies that the schema returned by `GetFlightInfo` and `DoGet`
/// match for `cmd`.
async fn assert_schema(client: &mut FlightClient, cmd: Any) {
println!("Checking schema for message type {}", cmd.type_url);
let descriptor = FlightDescriptor::new_cmd(cmd.encode_to_vec());
let flight_info = client.get_flight_info(descriptor).await.unwrap();
assert_eq!(flight_info.endpoint.len(), 1);
let ticket = flight_info.endpoint[0]
.ticket
.as_ref()
.expect("Need ticket")
.clone();
// Schema reported by `GetFlightInfo`
let flight_info_schema = flight_info.try_decode_schema().unwrap();
// Get results and ensure they match the schema reported by GetFlightInfo
let mut result_stream = client.do_get(ticket).await.unwrap();
let mut saw_data = false;
while let Some(batch) = result_stream.try_next().await.unwrap() {
saw_data = true;
assert_eq!(batch.schema().as_ref(), &flight_info_schema);
// The stream itself also may report a schema
if let Some(stream_schema) = result_stream.schema() {
assert_eq!(stream_schema.as_ref(), &flight_info_schema);
}
}
// verify we have seen at least one RecordBatch
// (all FlightSQL endpoints return at least one)
assert!(saw_data);
}
/// Return a [`FlightSqlClient`] configured for use
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
let connection = cluster.querier().querier_grpc_connection();