From 7f7d0b8d521f9c245d41b768026831a5249f6ea2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 22 Mar 2023 14:58:44 +0100 Subject: [PATCH] fix(flightsql): Send correct schema in GetFlightInfo for Queries (#7283) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + arrow_util/src/flight.rs | 24 +++++++++++ arrow_util/src/lib.rs | 1 + flightsql/Cargo.toml | 1 + flightsql/src/planner.rs | 4 +- .../tests/end_to_end_cases/flightsql.rs | 43 ++++++++++++++----- service_grpc_flight/src/lib.rs | 28 +----------- 7 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 arrow_util/src/flight.rs diff --git a/Cargo.lock b/Cargo.lock index 4262118d43..1ba6b63fe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1930,6 +1930,7 @@ version = "0.1.0" dependencies = [ "arrow", "arrow-flight", + "arrow_util", "bytes", "datafusion", "futures", diff --git a/arrow_util/src/flight.rs b/arrow_util/src/flight.rs new file mode 100644 index 0000000000..bfea1e33a2 --- /dev/null +++ b/arrow_util/src/flight.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + +/// Prepare an arrow Schema for transport over the Arrow Flight protocol +/// +/// Converts dictionary types to underlying types due to +pub fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef { + let fields = schema + .fields() + .iter() + .map(|field| match field.data_type() { + DataType::Dictionary(_, value_type) => Field::new( + field.name(), + value_type.as_ref().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + _ => field.clone(), + }) + .collect(); + + Arc::new(Schema::new(fields).with_metadata(schema.metadata().clone())) +} diff --git a/arrow_util/src/lib.rs b/arrow_util/src/lib.rs index 401cfe7eab..5b4f49f8e3 100644 --- a/arrow_util/src/lib.rs +++ b/arrow_util/src/lib.rs @@ -13,6 +13,7 @@ pub mod bitset; pub mod dictionary; pub mod display; +pub mod flight; pub mod optimize; pub mod string; pub mod util; diff --git a/flightsql/Cargo.toml b/flightsql/Cargo.toml index 1aa7151acc..51789fb6fb 100644 --- a/flightsql/Cargo.toml +++ b/flightsql/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] arrow = { workspace = true, features = ["prettyprint"] } arrow-flight = { workspace = true, features = ["flight-sql-experimental"] } +arrow_util = { path = "../arrow_util" } datafusion = { workspace = true } observability_deps = { path = "../observability_deps" } iox_query = { path = "../iox_query" } diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index 81515fc50e..6df89acb65 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -14,6 +14,7 @@ use arrow_flight::{ }, IpcMessage, SchemaAsIpc, }; +use arrow_util::flight::prepare_schema_for_flight; use bytes::Bytes; use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue}; use iox_query::{exec::IOxSessionContext, QueryNamespace}; @@ -210,7 +211,8 @@ async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result Result { // gather real schema, but only - let schema = Schema::from(logical_plan.schema().as_ref()); + let schema = Arc::new(Schema::from(logical_plan.schema().as_ref())) as _; + let schema = prepare_schema_for_flight(schema); encode_schema(&schema) } diff --git a/influxdb_iox/tests/end_to_end_cases/flightsql.rs b/influxdb_iox/tests/end_to_end_cases/flightsql.rs index b584c6763a..151d06e4c5 100644 --- a/influxdb_iox/tests/end_to_end_cases/flightsql.rs +++ b/influxdb_iox/tests/end_to_end_cases/flightsql.rs @@ -1,11 +1,14 @@ -use std::path::PathBuf; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; -use arrow::record_batch::RecordBatch; +use arrow::{ + datatypes::{Schema, SchemaRef}, + record_batch::RecordBatch, +}; use arrow_flight::{ decode::FlightRecordBatchStream, sql::{ Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes, - CommandGetTables, ProstMessageExt, SqlInfo, + CommandGetTables, CommandStatementQuery, ProstMessageExt, SqlInfo, }, FlightClient, FlightDescriptor, }; @@ -658,12 +661,10 @@ async fn flightsql_schema_matches() { // Verify schema for each type of command let cases = vec![ - // CommandStatementQuery fails because of - // https://github.com/influxdata/influxdb_iox/issues/7279> - // CommandStatementQuery { - // query: format!("select * from {table_name}"), - // } - // .as_any(), + CommandStatementQuery { + query: format!("select * from {table_name}"), + } + .as_any(), CommandGetSqlInfo { info: vec![] }.as_any(), CommandGetCatalogs {}.as_any(), CommandGetDbSchemas { @@ -717,9 +718,21 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) { let mut saw_data = false; while let Some(batch) = result_stream.try_next().await.unwrap() { saw_data = true; - assert_eq!(batch.schema().as_ref(), &flight_info_schema); + // strip metadata (GetFlightInfo doesn't include metadata for + // some reason) before comparison + // https://github.com/influxdata/influxdb_iox/issues/7282 + let batch_schema = strip_metadata(&batch.schema()); + assert_eq!( + batch_schema.as_ref(), + &flight_info_schema, + "batch_schema:\n{batch_schema:#?}\n\nflight_info_schema:\n{flight_info_schema:#?}" + ); // The stream itself also may report a schema if let Some(stream_schema) = result_stream.schema() { + // strip metadata (GetFlightInfo doesn't include metadata for + // some reason) before comparison + // https://github.com/influxdata/influxdb_iox/issues/7282 + let stream_schema = strip_metadata(stream_schema); assert_eq!(stream_schema.as_ref(), &flight_info_schema); } } @@ -728,6 +741,16 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) { assert!(saw_data); } +fn strip_metadata(schema: &Schema) -> SchemaRef { + let stripped_fields: Vec<_> = schema + .fields() + .iter() + .map(|f| f.clone().with_metadata(HashMap::new())) + .collect(); + + Arc::new(Schema::new(stripped_fields)) +} + /// Return a [`FlightSqlClient`] configured for use fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient { let connection = cluster.querier().querier_grpc_connection(); diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 5f95a5e6ce..f69e814310 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -4,9 +4,7 @@ mod request; use arrow::{ - datatypes::{DataType, Field, Schema, SchemaRef}, - error::ArrowError, - ipc::writer::IpcWriteOptions, + datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions, record_batch::RecordBatch, }; use arrow_flight::{ @@ -16,6 +14,7 @@ use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; +use arrow_util::flight::prepare_schema_for_flight; use bytes::Bytes; use data_types::NamespaceNameError; use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; @@ -799,29 +798,6 @@ impl Stream for IOxFlightDataEncoder { } } -/// Prepare an arrow Schema for transport over the Arrow Flight protocol -/// -/// Convert dictionary types to underlying types -/// -/// See hydrate_dictionary for more information -fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef { - let fields = schema - .fields() - .iter() - .map(|field| match field.data_type() { - DataType::Dictionary(_, value_type) => Field::new( - field.name(), - value_type.as_ref().clone(), - field.is_nullable(), - ) - .with_metadata(field.metadata().clone()), - _ => field.clone(), - }) - .collect(); - - Arc::new(Schema::new(fields).with_metadata(schema.metadata().clone())) -} - impl Stream for GetStream { type Item = Result;