fix(flightsql): Send correct schema in GetFlightInfo for Queries (#7283)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-03-22 14:58:44 +01:00 committed by GitHub
parent 222d765323
commit 7f7d0b8d52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 65 additions and 37 deletions

1
Cargo.lock generated
View File

@ -1930,6 +1930,7 @@ version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"bytes",
"datafusion",
"futures",

24
arrow_util/src/flight.rs Normal file
View File

@ -0,0 +1,24 @@
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Converts dictionary types to underlying types due to <https://github.com/apache/arrow-rs/issues/3389>
pub fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
)
.with_metadata(field.metadata().clone()),
_ => field.clone(),
})
.collect();
Arc::new(Schema::new(fields).with_metadata(schema.metadata().clone()))
}

View File

@ -13,6 +13,7 @@
pub mod bitset;
pub mod dictionary;
pub mod display;
pub mod flight;
pub mod optimize;
pub mod string;
pub mod util;

View File

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
arrow_util = { path = "../arrow_util" }
datafusion = { workspace = true }
observability_deps = { path = "../observability_deps" }
iox_query = { path = "../iox_query" }

View File

@ -14,6 +14,7 @@ use arrow_flight::{
},
IpcMessage, SchemaAsIpc,
};
use arrow_util::flight::prepare_schema_for_flight;
use bytes::Bytes;
use datafusion::{logical_expr::LogicalPlan, physical_plan::ExecutionPlan, scalar::ScalarValue};
use iox_query::{exec::IOxSessionContext, QueryNamespace};
@ -210,7 +211,8 @@ async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<By
/// returns: IPC encoded (schema_bytes) for this query
fn get_schema_for_plan(logical_plan: LogicalPlan) -> Result<Bytes> {
// gather real schema, but only
let schema = Schema::from(logical_plan.schema().as_ref());
let schema = Arc::new(Schema::from(logical_plan.schema().as_ref())) as _;
let schema = prepare_schema_for_flight(schema);
encode_schema(&schema)
}

View File

@ -1,11 +1,14 @@
use std::path::PathBuf;
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_flight::{
decode::FlightRecordBatchStream,
sql::{
Any, CommandGetCatalogs, CommandGetDbSchemas, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables, ProstMessageExt, SqlInfo,
CommandGetTables, CommandStatementQuery, ProstMessageExt, SqlInfo,
},
FlightClient, FlightDescriptor,
};
@ -658,12 +661,10 @@ async fn flightsql_schema_matches() {
// Verify schema for each type of command
let cases = vec![
// CommandStatementQuery fails because of
// https://github.com/influxdata/influxdb_iox/issues/7279>
// CommandStatementQuery {
// query: format!("select * from {table_name}"),
// }
// .as_any(),
CommandStatementQuery {
query: format!("select * from {table_name}"),
}
.as_any(),
CommandGetSqlInfo { info: vec![] }.as_any(),
CommandGetCatalogs {}.as_any(),
CommandGetDbSchemas {
@ -717,9 +718,21 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) {
let mut saw_data = false;
while let Some(batch) = result_stream.try_next().await.unwrap() {
saw_data = true;
assert_eq!(batch.schema().as_ref(), &flight_info_schema);
// strip metadata (GetFlightInfo doesn't include metadata for
// some reason) before comparison
// https://github.com/influxdata/influxdb_iox/issues/7282
let batch_schema = strip_metadata(&batch.schema());
assert_eq!(
batch_schema.as_ref(),
&flight_info_schema,
"batch_schema:\n{batch_schema:#?}\n\nflight_info_schema:\n{flight_info_schema:#?}"
);
// The stream itself also may report a schema
if let Some(stream_schema) = result_stream.schema() {
// strip metadata (GetFlightInfo doesn't include metadata for
// some reason) before comparison
// https://github.com/influxdata/influxdb_iox/issues/7282
let stream_schema = strip_metadata(stream_schema);
assert_eq!(stream_schema.as_ref(), &flight_info_schema);
}
}
@ -728,6 +741,16 @@ async fn assert_schema(client: &mut FlightClient, cmd: Any) {
assert!(saw_data);
}
fn strip_metadata(schema: &Schema) -> SchemaRef {
let stripped_fields: Vec<_> = schema
.fields()
.iter()
.map(|f| f.clone().with_metadata(HashMap::new()))
.collect();
Arc::new(Schema::new(stripped_fields))
}
/// Return a [`FlightSqlClient`] configured for use
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
let connection = cluster.querier().querier_grpc_connection();

View File

@ -4,9 +4,7 @@
mod request;
use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
error::ArrowError,
ipc::writer::IpcWriteOptions,
datatypes::SchemaRef, error::ArrowError, ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
use arrow_flight::{
@ -16,6 +14,7 @@ use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use arrow_util::flight::prepare_schema_for_flight;
use bytes::Bytes;
use data_types::NamespaceNameError;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
@ -799,29 +798,6 @@ impl Stream for IOxFlightDataEncoder {
}
}
/// Prepare an arrow Schema for transport over the Arrow Flight protocol
///
/// Convert dictionary types to underlying types
///
/// See hydrate_dictionary for more information
fn prepare_schema_for_flight(schema: SchemaRef) -> SchemaRef {
let fields = schema
.fields()
.iter()
.map(|field| match field.data_type() {
DataType::Dictionary(_, value_type) => Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
)
.with_metadata(field.metadata().clone()),
_ => field.clone(),
})
.collect();
Arc::new(Schema::new(fields).with_metadata(schema.metadata().clone()))
}
impl Stream for GetStream {
type Item = Result<FlightData, tonic::Status>;