feat: FlightSQL Milestone 2 basic FlightSQL client and FlightSQL server implementation and plumbing (#6398)

* feat: Add basic Flight and FlightSQL client into IOx codebase

Basic flight end to end test

* fix: Apply suggestions from code review

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-12-20 12:34:00 -05:00 committed by GitHub
parent d1845057cf
commit e1059a9009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 415 additions and 17 deletions

3
Cargo.lock generated
View File

@ -4875,7 +4875,6 @@ name = "service_grpc_flight"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"arrow_util",
"assert_matches",
"bytes",
@ -4883,11 +4882,13 @@ dependencies = [
"datafusion",
"futures",
"generated_types",
"iox_arrow_flight",
"iox_query",
"metric",
"observability_deps",
"pin-project",
"prost 0.11.3",
"prost-types 0.11.2",
"serde",
"serde_json",
"service_common",

View File

@ -0,0 +1,66 @@
use arrow_util::assert_batches_sorted_eq;
use futures::{FutureExt, TryStreamExt};
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTest, StepTestState};
#[tokio::test]
async fn flightsql_query() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!(
"{},tag1=A,tag2=B val=42i 123456\n\
{},tag1=A,tag2=C val=43i 123457",
table_name, table_name
)),
Step::WaitForReadable,
Step::AssertNotPersisted,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let sql = format!("select * from {}", table_name);
let expected = vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
"+------+------+--------------------------------+-----+",
];
let connection = state.cluster().querier().querier_grpc_connection();
let (channel, _headers) = connection.into_grpc_connection().into_parts();
let mut client = iox_arrow_flight::FlightSqlClient::new(channel);
// Add namespace to client headers until it is fully supported by FlightSQL
let namespace = state.cluster().namespace();
client.add_header("iox-namespace-name", namespace).unwrap();
let batches: Vec<_> = client
.query(sql)
.await
.expect("ran SQL query")
.try_collect()
.await
.expect("got batches");
assert_batches_sorted_eq!(&expected, &batches);
}
.boxed()
})),
],
)
.run()
.await
}
// TODO other tests:
// 1. Errors
// 2. Prepared statements

View File

@ -6,6 +6,7 @@ mod cli;
mod compactor;
mod debug;
mod error;
mod flightsql;
mod influxql;
mod ingester;
mod logging;

View File

@ -0,0 +1,160 @@
//! Fork of the Arrow Rust FlightSQL client
//! <https://github.com/apache/arrow-rs/tree/master/arrow-flight/src/sql/client.rs>
//!
//! Plan is to upstream much/all of this to arrow-rs
//! see <https://github.com/apache/arrow-rs/issues/3301>
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow_flight::sql::{CommandStatementQuery, ProstMessageExt};
use arrow_flight::{FlightDescriptor, FlightInfo};
use prost::Message;
use tonic::metadata::MetadataMap;
use tonic::transport::Channel;
use crate::{
error::{FlightError, Result},
FlightClient, FlightRecordBatchStream,
};
/// A FlightSQLServiceClient handles details of interacting with a
/// remote server using the FlightSQL protocol.
#[derive(Debug)]
pub struct FlightSqlClient {
inner: FlightClient,
}
/// An [Arrow Flight SQL](https://arrow.apache.org/docs/format/FlightSql.html) client
/// that can run queries against FlightSql servers.
///
/// If you need more low level control, such as access to response
/// headers, or redirecting to different endpoints, use the lower
/// level [`FlightClient`].
///
/// This client is in the "experimental" stage. It is not guaranteed
/// to follow the spec in all instances. Github issues are welcomed.
impl FlightSqlClient {
/// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
pub fn new(channel: Channel) -> Self {
Self::new_from_flight(FlightClient::new(channel))
}
/// Create a new client from an existing [`FlightClient`]
pub fn new_from_flight(inner: FlightClient) -> Self {
FlightSqlClient { inner }
}
/// Return a reference to the underlying [`FlightClient`]
pub fn inner(&self) -> &FlightClient {
&self.inner
}
/// Return a mutable reference to the underlying [`FlightClient`]
pub fn inner_mut(&mut self) -> &mut FlightClient {
&mut self.inner
}
/// Consume self and return the inner [`FlightClient`]
pub fn into_inner(self) -> FlightClient {
self.inner
}
/// Return a reference to gRPC metadata included with each request
pub fn metadata(&self) -> &MetadataMap {
self.inner.metadata()
}
/// Return a reference to gRPC metadata included with each request
///
/// This can be used, for example, to include authorization or
/// other headers with each request
pub fn metadata_mut(&mut self) -> &mut MetadataMap {
self.inner.metadata_mut()
}
/// Add the specified header with value to all subsequent requests
pub fn add_header(&mut self, key: &str, value: &str) -> Result<()> {
self.inner.add_header(key, value)
}
/// Send `cmd`, encoded as protobuf, to the FlightSQL server
async fn get_flight_info_for_command<M: ProstMessageExt>(
&mut self,
cmd: M,
) -> Result<FlightInfo> {
let descriptor = FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
self.inner.get_flight_info(descriptor).await
}
/// Execute a SQL query on the server using `CommandStatementQuery.
///
/// This involves two round trips
///
/// Step 1: send a [`CommandStatementQuery`] message to the
/// `GetFlightInfo` endpoint of the FlightSQL server to receive a
/// FlightInfo descriptor.
///
/// Step 2: Fetch the results described in the [`FlightInfo`]
///
/// This implementation does not support alternate endpoints
pub async fn query(&mut self, query: String) -> Result<FlightRecordBatchStream> {
let cmd = CommandStatementQuery { query };
let FlightInfo {
schema: _,
flight_descriptor: _,
mut endpoint,
total_records: _,
total_bytes: _,
} = self.get_flight_info_for_command(cmd).await?;
let flight_endpoint = endpoint.pop().ok_or_else(|| {
FlightError::protocol("No endpoint specifed in CommandStatementQuery response")
})?;
// "If the list is empty, the expectation is that the
// ticket can only be redeemed on the current service
// where the ticket was generated."
//
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L292-L294
if !flight_endpoint.location.is_empty() {
return Err(FlightError::NotYetImplemented(format!(
"FlightEndpoint with non empty 'location' not supported ({:?})",
flight_endpoint.location,
)));
}
if !endpoint.is_empty() {
return Err(FlightError::NotYetImplemented(format!(
"Multiple endpoints returned in CommandStatementQuery response ({})",
endpoint.len() + 1,
)));
}
// Get the underlying ticket
let ticket = flight_endpoint
.ticket
.ok_or_else(|| {
FlightError::protocol(
"No ticket specifed in CommandStatementQuery's FlightInfo response",
)
})?
.ticket;
self.inner.do_get(ticket).await
}
}

View File

@ -25,6 +25,9 @@ pub use client::{
DecodedFlightData, DecodedPayload, FlightClient, FlightDataStream, FlightRecordBatchStream,
};
pub mod flightsql;
pub use flightsql::FlightSqlClient;
/// Reexport all of arrow_flight so this crate can masquarade as
/// `arrow-flight` in the IOx codebase (as the aim is to publish this
/// all upstream)

View File

@ -12,6 +12,7 @@ data_types = { path = "../data_types" }
datafusion = { workspace = true }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
prost-types = { version = "0.11", features = ["std"] }
iox_query = { path = "../iox_query" }
service_common = { path = "../service_common" }
trace = { path = "../trace"}
@ -20,9 +21,9 @@ tracker = { path = "../tracker" }
# Crates.io dependencies, in alphabetical order
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true }
bytes = "1.3"
futures = "0.3"
iox_arrow_flight = { path = "../iox_arrow_flight" }
pin-project = "1.0"
prost = "0.11"
serde = { version = "1.0", features = ["derive"] }

View File

@ -3,11 +3,6 @@
mod request;
use arrow::error::ArrowError;
use arrow_flight::{
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use arrow_util::optimize::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
};
@ -16,6 +11,13 @@ use data_types::NamespaceNameError;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use futures::{SinkExt, Stream, StreamExt};
use generated_types::influxdata::iox::querier::v1 as proto;
use iox_arrow_flight::{
flight_descriptor::DescriptorType,
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
sql::{CommandStatementQuery, ProstMessageExt},
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
use iox_query::{
exec::{ExecutionContextProvider, IOxSessionContext},
QueryCompletedToken, QueryNamespace,
@ -32,12 +34,22 @@ use trace::{ctx::SpanContext, span::SpanExt};
use trace_http::ctx::{RequestLogContext, RequestLogContextExt};
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
/// The name of the grpc header that contains the target iox namespace
/// name for FlightSQL requests.
///
/// See <https://lists.apache.org/thread/fd6r1n7vt91sg2c7fr35wcrsqz6x4645>
/// for discussion on adding support to FlightSQL itself.
const IOX_FLIGHT_SQL_NAMESPACE_HEADER: &str = "iox-namespace-name";
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid ticket. Error: {}", source))]
InvalidTicket { source: request::Error },
#[snafu(display("Internal creating encoding ticket: {}", source))]
InternalCreatingTicket { source: request::Error },
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
@ -57,6 +69,14 @@ pub enum Error {
source: DataFusionError,
},
#[snafu(display("no 'iox-namespace-name' header in request"))]
NoNamespaceHeader,
#[snafu(display("Invalid 'iox-namespace-name' header in request: {}", source))]
InvalidNamespaceHeader {
source: tonic::metadata::errors::ToStrError,
},
#[snafu(display("Invalid namespace name: {}", source))]
InvalidNamespaceName { source: NamespaceNameError },
@ -70,6 +90,18 @@ pub enum Error {
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
#[snafu(display("Invalid protobuf: {}", source))]
Deserialization { source: prost::DecodeError },
#[snafu(display("Invalid protobuf for type_url'{}': {}", type_url, source))]
DeserializationTypeKnown {
type_url: String,
source: prost::DecodeError,
},
#[snafu(display("Unsupported message type: {}", description))]
UnsupportedMessageType { description: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -84,11 +116,21 @@ impl From<Error> for tonic::Status {
Error::NamespaceNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
// TODO(edd): this should be `debug`. Keeping at info while IOx in early development
| Error::InvalidNamespaceName { .. } => info!(e=%err, msg),
Error::Query { .. } => info!(e=%err, msg),
Error::Optimize { .. }
| Error::Planning { .. } | Error::Serialization { .. } => warn!(e=%err, msg),
|Error::NoNamespaceHeader
|Error::InvalidNamespaceHeader { .. }
| Error::Planning { .. }
| Error::Serialization { .. }
| Error::Deserialization { .. }
| Error::DeserializationTypeKnown { .. }
| Error::InternalCreatingTicket { .. }
| Error::UnsupportedMessageType { .. }
=> {
warn!(e=%err, msg)
}
}
err.into_status()
}
@ -104,15 +146,27 @@ impl Error {
Self::NamespaceNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. }
| Self::InvalidQuery { .. }
| Self::Serialization { .. }
| Self::Deserialization { .. }
| Self::DeserializationTypeKnown { .. }
| Self::NoNamespaceHeader
| Self::InvalidNamespaceHeader { .. }
| Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument,
Self::Planning { source, .. } | Self::Query { source, .. } => {
datafusion_error_to_tonic_code(&source)
}
Self::Optimize { .. } | Self::Serialization { .. } => tonic::Code::Internal,
Self::UnsupportedMessageType { .. } => tonic::Code::Unimplemented,
Self::InternalCreatingTicket { .. } | Self::Optimize { .. } => tonic::Code::Internal,
};
tonic::Status::new(code, msg)
}
fn unsupported_message_type(description: impl Into<String>) -> Self {
Self::UnsupportedMessageType {
description: description.into(),
}
}
}
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
@ -186,7 +240,7 @@ where
type ListFlightsStream = TonicStream<FlightInfo>;
type DoGetStream = TonicStream<FlightData>;
type DoPutStream = TonicStream<PutResult>;
type DoActionStream = TonicStream<arrow_flight::Result>;
type DoActionStream = TonicStream<iox_arrow_flight::Result>;
type ListActionsStream = TonicStream<ActionType>;
type DoExchangeStream = TonicStream<FlightData>;
@ -259,11 +313,43 @@ where
Err(tonic::Status::unimplemented("Not yet implemented"))
}
/// Handles requests encoded in the FlightDescriptor
///
/// IOx currently only processes "cmd" type Descriptors (not
/// paths) and attempts to decodes the [`FlightDescriptor::cmd`]
/// bytes as an encoded protobuf message
///
///
async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
// look for namespace information in headers
let namespace_name = request
.metadata()
.get(IOX_FLIGHT_SQL_NAMESPACE_HEADER)
.map(|v| {
v.to_str()
.context(InvalidNamespaceHeaderSnafu)
.map(|s| s.to_string())
})
.ok_or(Error::NoNamespaceHeader)??;
let request = request.into_inner();
let cmd = match request.r#type() {
DescriptorType::Cmd => Ok(&request.cmd),
DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")),
DescriptorType::Unknown => Err(Error::unsupported_message_type(
"FlightInfo of unknown type",
)),
}?;
let message: prost_types::Any =
prost::Message::decode(cmd.as_slice()).context(DeserializationSnafu)?;
let flight_info = self.dispatch(&namespace_name, request, message).await?;
Ok(tonic::Response::new(flight_info))
}
async fn do_put(
@ -295,6 +381,88 @@ where
}
}
impl<S> FlightService<S>
where
S: QueryNamespaceProvider,
{
/// Given a successfully decoded protobuf *Any* message, handles
/// recognized messages (e.g those defined by FlightSQL) and
/// creates the appropriate FlightData response
///
/// Arguments
///
/// namespace_name: is the target namespace of the request
///
/// flight_descriptor: is the descriptor sent in the request (included in response)
///
/// msg is the `cmd` field of the flight descriptor decoded as a protobuf message
async fn dispatch(
&self,
namespace_name: &str,
flight_descriptor: FlightDescriptor,
msg: prost_types::Any,
) -> Result<FlightInfo> {
fn try_unpack<T: ProstMessageExt>(msg: &prost_types::Any) -> Result<Option<T>> {
// Does the type URL match?
if T::type_url() != msg.type_url {
return Ok(None);
}
// type matched, so try and decode
let m = prost::Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu {
type_url: &msg.type_url,
})?;
Ok(Some(m))
}
// FlightSQL CommandStatementQuery
let (schema, ticket) = if let Some(cmd) = try_unpack::<CommandStatementQuery>(&msg)? {
let CommandStatementQuery { query } = cmd;
debug!(%namespace_name, %query, "Handling FlightSQL CommandStatementQuery");
// TODO is supposed to return a schema -- if clients
// actually expect the schema we'll have to plan the query
// here.
let schema = vec![];
// Create a ticket that can be passed to do_get to run the query
let ticket = IoxGetRequest::new(namespace_name, RunQuery::Sql(query))
.try_encode()
.context(InternalCreatingTicketSnafu)?;
(schema, ticket)
} else {
return Err(Error::unsupported_message_type(format!(
"Unsupported cmd message: {}",
msg.type_url
)));
};
// form the response
// Arrow says "set to -1 if not known
let total_records = -1;
let total_bytes = -1;
let endpoint = vec![FlightEndpoint {
ticket: Some(ticket),
// "If the list is empty, the expectation is that the
// ticket can only be redeemed on the current service
// where the ticket was generated."
//
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L292-L294
location: vec![],
}];
Ok(FlightInfo {
schema,
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records,
total_bytes,
})
}
}
#[pin_project(PinnedDrop)]
struct GetStream {
#[pin]
@ -349,7 +517,7 @@ impl GetStream {
Ok(batch) => {
for batch in split_batch_for_grpc_response(batch) {
let (flight_dictionaries, flight_batch) =
arrow_flight::utils::flight_data_from_arrow_batch(
iox_arrow_flight::utils::flight_data_from_arrow_batch(
&batch, &options,
);

View File

@ -1,9 +1,9 @@
//! Ticket handling for the native IOx Flight API
use arrow_flight::Ticket;
use bytes::Bytes;
use generated_types::influxdata::iox::querier::v1 as proto;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use iox_arrow_flight::Ticket;
use observability_deps::tracing::trace;
use prost::Message;
use serde::Deserialize;
@ -48,7 +48,6 @@ impl Display for RunQuery {
impl IoxGetRequest {
/// Create a new request to run the specified query
#[allow(dead_code)]
pub fn new(namespace_name: impl Into<String>, query: RunQuery) -> Self {
Self {
namespace_name: namespace_name.into(),
@ -72,7 +71,6 @@ impl IoxGetRequest {
}
/// Encode the request as a protobuf Ticket
#[allow(dead_code)]
pub fn try_encode(self) -> Result<Ticket> {
let Self {
namespace_name,