refactor: Move `flightsql` code into its own module, add docs and tests (#6640)

* refactor: Move `flightsql`  code into its own module

* fix: get schema from LogicalPlan

* refactor: use arrow_flight::sql::Any instead of prost_types::any

* fix: cleanup docs and avoid as_ref

* fix: Use Bytes

* fix: use Any::pack

* fix: doclink
pull/24376/head
Andrew Lamb 2023-01-24 19:24:32 +01:00 committed by GitHub
parent 4521516147
commit c3bc61f10e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 673 additions and 185 deletions

23
Cargo.lock generated
View File

@ -1790,6 +1790,24 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flightsql"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"bytes",
"datafusion",
"futures",
"iox_query",
"observability_deps",
"prost 0.11.6",
"snafu",
"tokio",
"tonic",
"workspace-hack",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
@ -5103,8 +5121,11 @@ dependencies = [
name = "service_common"
version = "0.1.0"
dependencies = [
"arrow-flight",
"async-trait",
"bytes",
"datafusion",
"flightsql",
"iox_query",
"metric",
"parking_lot 0.12.1",
@ -5141,13 +5162,13 @@ dependencies = [
"bytes",
"data_types",
"datafusion",
"flightsql",
"futures",
"generated_types",
"iox_query",
"metric",
"observability_deps",
"prost 0.11.6",
"prost-types 0.11.6",
"serde",
"serde_json",
"service_common",

View File

@ -12,7 +12,9 @@ members = [
"datafusion_util",
"dml",
"executor",
"flightsql",
"generated_types",
"garbage_collector",
"grpc-binary-logger-proto",
"grpc-binary-logger-test-proto",
"grpc-binary-logger",
@ -29,7 +31,6 @@ members = [
"ingester2",
"iox_catalog",
"iox_data_generator",
"garbage_collector",
"iox_query",
"iox_tests",
"iox_time",

22
flightsql/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "flightsql"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
datafusion = { workspace = true }
observability_deps = { path = "../observability_deps" }
iox_query = { path = "../iox_query" }
# Crates.io dependencies, in alphabetical order
bytes = "1.3"
futures = "0.3"
snafu = "0.7"
prost = "0.11"
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = "0.8"
workspace-hack = { path = "../workspace-hack"}

4
flightsql/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
//! InfluxDB IOx implementation of FlightSQL
mod planner;
pub use planner::{Error, FlightSQLPlanner, Result};

169
flightsql/src/planner.rs Normal file
View File

@ -0,0 +1,169 @@
//! FlightSQL handling
use std::{string::FromUtf8Error, sync::Arc};
use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions};
use arrow_flight::{
error::FlightError,
sql::{Any, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt},
IpcMessage, SchemaAsIpc,
};
use bytes::Bytes;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use iox_query::{exec::IOxSessionContext, QueryNamespace};
use observability_deps::tracing::debug;
use prost::Message;
use snafu::{ResultExt, Snafu};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid protobuf for type_url '{}': {}", type_url, source))]
DeserializationTypeKnown {
type_url: String,
source: prost::DecodeError,
},
#[snafu(display("Query was not valid UTF-8: {}", source))]
InvalidUtf8 { source: FromUtf8Error },
#[snafu(display("{}", source))]
Flight { source: FlightError },
#[snafu(display("{}", source))]
DataFusion { source: DataFusionError },
#[snafu(display("{}", source))]
Arrow { source: ArrowError },
#[snafu(display("Unsupported FlightSQL message type: {}", description))]
UnsupportedMessageType { description: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<FlightError> for Error {
fn from(value: FlightError) -> Self {
Self::Flight { source: value }
}
}
impl From<Error> for DataFusionError {
fn from(value: Error) -> Self {
match value {
Error::DataFusion { source } => source,
Error::Arrow { source } => DataFusionError::ArrowError(source),
value => DataFusionError::External(Box::new(value)),
}
}
}
/// Logic for creating plans for various Flight messages against a query database
#[derive(Debug, Default)]
pub struct FlightSQLPlanner {}
impl FlightSQLPlanner {
pub fn new() -> Self {
Self {}
}
/// Returns the schema, in Arrow IPC encoded form, for the request in msg.
pub async fn get_flight_info(
namespace_name: impl Into<String>,
msg: Any,
ctx: &IOxSessionContext,
) -> Result<Bytes> {
let namespace_name = namespace_name.into();
debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql get_flight_info");
match FlightSQLCommand::try_new(&msg)? {
FlightSQLCommand::CommandStatementQuery(query)
| FlightSQLCommand::CommandPreparedStatementQuery(query) => {
Self::get_schema_for_query(&query, ctx).await
}
}
}
/// Return the schema for the specified query
///
/// returns: IPC encoded (schema_bytes) for this query
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
// gather real schema, but only
let logical_plan = ctx.plan_sql(query).await.context(DataFusionSnafu)?;
let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref());
let options = IpcWriteOptions::default();
// encode the schema into the correct form
let IpcMessage(schema) = SchemaAsIpc::new(&schema, &options)
.try_into()
.context(ArrowSnafu)?;
Ok(schema)
}
/// Returns a plan that computes results requested in msg
pub async fn do_get(
namespace_name: impl Into<String>,
_database: Arc<dyn QueryNamespace>,
msg: Any,
ctx: &IOxSessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let namespace_name = namespace_name.into();
debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql plan to run an actual query");
match FlightSQLCommand::try_new(&msg)? {
FlightSQLCommand::CommandStatementQuery(query) => {
debug!(%query, "Planning FlightSQL query");
ctx.prepare_sql(&query).await.context(DataFusionSnafu)
}
FlightSQLCommand::CommandPreparedStatementQuery(query) => {
debug!(%query, "Planning FlightSQL prepared query");
ctx.prepare_sql(&query).await.context(DataFusionSnafu)
}
}
}
}
/// Decoded and validated FlightSQL command
#[derive(Debug, Clone)]
enum FlightSQLCommand {
CommandStatementQuery(String),
CommandPreparedStatementQuery(String),
}
impl FlightSQLCommand {
/// Figure out and decode the specific FlightSQL command in `msg`
fn try_new(msg: &Any) -> Result<Self> {
if let Some(decoded_cmd) = try_unpack::<CommandStatementQuery>(msg)? {
let CommandStatementQuery { query } = decoded_cmd;
Ok(Self::CommandStatementQuery(query))
} else if let Some(decoded_cmd) = try_unpack::<CommandPreparedStatementQuery>(msg)? {
let CommandPreparedStatementQuery {
prepared_statement_handle,
} = decoded_cmd;
// handle should be a decoded query
let query =
String::from_utf8(prepared_statement_handle.to_vec()).context(InvalidUtf8Snafu)?;
Ok(Self::CommandPreparedStatementQuery(query))
} else {
UnsupportedMessageTypeSnafu {
description: &msg.type_url,
}
.fail()
}
}
}
/// try to unpack the [`arrow_flight::sql::Any`] as type `T`, returning Ok(None) if
/// the type is wrong or Err if an error occurs
fn try_unpack<T: ProstMessageExt>(msg: &Any) -> Result<Option<T>> {
// Does the type URL match?
if T::type_url() != msg.type_url {
return Ok(None);
}
// type matched, so try and decode
let m = Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu {
type_url: &msg.type_url,
})?;
Ok(Some(m))
}

View File

@ -3,38 +3,49 @@ package influxdata.iox.querier.v1;
option go_package = "github.com/influxdata/iox/querier/v1";
/*
* Message definitions for the InfluxDB IOx Flight API
* Message definition for the native InfluxDB IOx Flight API
*
* These messages are what is sent to/from an InfluxDB IOx server's
* `DoGet` endpoint as the opaque "Ticket" in Arrow Flight messages.
* ReadInfo is sent to an InfluxDB IOx Querier server's `DoGet` RPC
* method as the opaque "Ticket" in Arrow Flight messages.
*
* The bytes for the Tickets are created by encoding these messages
* using the protobuf binary format.
* Tickets are created by encoding these messages using the protobuf
* binary format.
*
* Clients can construct these Ticket's directly to avoid making two
* requests to run each query
* IOx clients can construct these Tickets directly to avoid making
* two RPC requests as typically required by Arrow Flight (a
* `GetFlightInfo` followed by a `DoGet`).
*/
// Request for an IOx querier to execute a query on a user's behalf.
message ReadInfo {
// Namespace name.
string namespace_name = 1;
// query text (either SQL or InfluxQL, depending on query_type)
// Query text (either SQL or InfluxQL, depending on query_type)
string sql_query = 2;
// A FlightSQL command payload (serialized protobuf bytes). One of
// the messages defined in the [protobuf definition].
//
// [protobuf definition]: https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions
bytes flightsql_command = 4;
// The type of query
QueryType query_type = 3;
enum QueryType {
// An unspecified query type. IOx may choose how to interpret sql_query.
QUERY_TYPE_UNSPECIFIED = 0;
// SQL query.
// SQL query. `sql_query` contains a SQL query as text
QUERY_TYPE_SQL = 1;
// InfluxQL query.
// InfluxQL query. `sql_query` contains an InfluxQL query as text
QUERY_TYPE_INFLUX_QL = 2;
// FlightSQL message: `sql_query` is empty, flightsql_command
// contains a serialized FlightSQL message.
QUERY_TYPE_FLIGHT_SQL_MESSAGE = 3;
}
}
// Message included in the DoGet response from the querier

View File

@ -180,6 +180,7 @@ impl Client {
namespace_name,
sql_query,
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
};
self.do_get_with_read_info(request).await
@ -196,6 +197,7 @@ impl Client {
namespace_name,
sql_query: influxql_query,
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
};
self.do_get_with_read_info(request).await

View File

@ -312,16 +312,20 @@ impl IOxSessionContext {
&self.inner
}
/// Plan a SQL statement. This assumes that any tables referenced
/// in the SQL have been registered with this context. Use
/// `prepare_sql` to actually execute the query.
pub async fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
let ctx = self.child_ctx("plan_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
ctx.inner.state().create_logical_plan(sql).await
}
/// Prepare a SQL statement for execution. This assumes that any
/// tables referenced in the SQL have been registered with this context
pub async fn prepare_sql(&self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
let ctx = self.child_ctx("prepare_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
#[allow(deprecated)]
let logical_plan = ctx.inner.state().create_logical_plan(sql).await?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
let logical_plan = self.plan_sql(sql).await?;
// Make nicer erorrs for unsupported SQL
// (By default datafusion returns Internal Error)
@ -347,6 +351,7 @@ impl IOxSessionContext {
_ => (),
}
let ctx = self.child_ctx("prepare_sql");
ctx.create_physical_plan(&logical_plan).await
}

View File

@ -7,8 +7,11 @@ license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1.63"
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
bytes = "1.3"
datafusion = { workspace = true }
iox_query = { path = "../iox_query" }
flightsql = { path = "../flightsql" }
metric = { path = "../metric" }
parking_lot = "0.12"
predicate = { path = "../predicate" }

View File

@ -1,7 +1,10 @@
//! Query planner wrapper for use in IOx services
use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
use arrow_flight::sql::Any;
use bytes::Bytes;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use flightsql::FlightSQLPlanner;
use iox_query::{
exec::IOxSessionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
@ -17,7 +20,7 @@ use predicate::rpc_predicate::InfluxRpcPredicate;
///
/// Query planning was, at time of writing, a single threaded
/// affair. In order to avoid tying up the tokio executor that is
/// handling API requests, we plan queries using a separate thread
/// handling API requests, IOx plan queries using a separate thread
/// pool.
pub struct Planner {
/// Executors (whose threadpool to use)
@ -60,6 +63,50 @@ impl Planner {
.await
}
/// Creates a plan for a `DoGet` FlightSQL message,
/// as described on [`FlightSQLPlanner::do_get`], on a
/// separate threadpool
pub async fn flight_sql_do_get<N>(
&self,
namespace_name: impl Into<String>,
namespace: Arc<N>,
msg: Any,
) -> Result<Arc<dyn ExecutionPlan>>
where
N: QueryNamespace + 'static,
{
let namespace_name = namespace_name.into();
let ctx = self.ctx.child_ctx("planner flight_sql_do_get");
self.ctx
.run(async move {
FlightSQLPlanner::do_get(namespace_name, namespace, msg, &ctx)
.await
.map_err(DataFusionError::from)
})
.await
}
/// Creates the response for a `GetFlightInfo` FlightSQL message
/// as described on [`FlightSQLPlanner::get_flight_info`], on a
/// separate threadpool.
pub async fn flight_sql_get_flight_info(
&self,
namespace_name: impl Into<String>,
msg: Any,
) -> Result<Bytes> {
let namespace_name = namespace_name.into();
let ctx = self.ctx.child_ctx("planner flight_sql_get_flight_info");
self.ctx
.run(async move {
FlightSQLPlanner::get_flight_info(namespace_name, msg, &ctx)
.await
.map_err(DataFusionError::from)
})
.await
}
/// Creates a plan as described on
/// [`InfluxRpcPlanner::table_names`], on a separate threadpool
pub async fn table_names<N>(

View File

@ -10,9 +10,9 @@ license.workspace = true
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
datafusion = { workspace = true }
flightsql = { path = "../flightsql" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
prost-types = { version = "0.11", features = ["std"] }
iox_query = { path = "../iox_query" }
service_common = { path = "../service_common" }
trace = { path = "../trace"}

View File

@ -1,4 +1,5 @@
//! Implements the InfluxDB IOx Flight API using Arrow Flight and gRPC
//! Implements the InfluxDB IOx Flight API and Arrow FlightSQL, based
//! on Arrow Flight and gRPC. See [`FlightService`] for full detail.
mod request;
@ -11,7 +12,7 @@ use arrow_flight::{
error::FlightError,
flight_descriptor::DescriptorType,
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
sql::{CommandStatementQuery, ProstMessageExt},
sql::Any,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
@ -28,7 +29,7 @@ use observability_deps::tracing::{debug, info, warn};
use prost::Message;
use request::{IoxGetRequest, RunQuery};
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
@ -51,11 +52,8 @@ pub enum Error {
#[snafu(display("Internal creating encoding ticket: {}", source))]
InternalCreatingTicket { source: request::Error },
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
source: serde_json::Error,
},
#[snafu(display("Invalid handshake. No payload provided"))]
InvalidHandshake {},
#[snafu(display("Namespace {} not found", namespace_name))]
NamespaceNotFound { namespace_name: String },
@ -89,18 +87,12 @@ pub enum Error {
source: service_common::planner::Error,
},
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
#[snafu(display("Error while planning FlightSQL : {}", source))]
FlightSQLPlanning { source: flightsql::Error },
#[snafu(display("Invalid protobuf: {}", source))]
Deserialization { source: prost::DecodeError },
#[snafu(display("Invalid protobuf for type_url'{}': {}", type_url, source))]
DeserializationTypeKnown {
type_url: String,
source: prost::DecodeError,
},
#[snafu(display("Unsupported message type: {}", description))]
UnsupportedMessageType { description: String },
}
@ -116,7 +108,7 @@ impl From<Error> for tonic::Status {
match err {
Error::NamespaceNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
| Error::InvalidHandshake { .. }
// TODO(edd): this should be `debug`. Keeping at info while IOx in early development
| Error::InvalidNamespaceName { .. } => info!(e=%err, msg),
Error::Query { .. } => info!(e=%err, msg),
@ -124,11 +116,10 @@ impl From<Error> for tonic::Status {
|Error::NoNamespaceHeader
|Error::InvalidNamespaceHeader { .. }
| Error::Planning { .. }
| Error::Serialization { .. }
| Error::Deserialization { .. }
| Error::DeserializationTypeKnown { .. }
| Error::InternalCreatingTicket { .. }
| Error::UnsupportedMessageType { .. }
| Error::FlightSQLPlanning { .. }
=> {
warn!(e=%err, msg)
}
@ -146,10 +137,8 @@ impl Error {
let code = match self {
Self::NamespaceNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. }
| Self::InvalidQuery { .. }
| Self::Serialization { .. }
| Self::InvalidHandshake { .. }
| Self::Deserialization { .. }
| Self::DeserializationTypeKnown { .. }
| Self::NoNamespaceHeader
| Self::InvalidNamespaceHeader { .. }
| Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument,
@ -157,6 +146,18 @@ impl Error {
datafusion_error_to_tonic_code(&source)
}
Self::UnsupportedMessageType { .. } => tonic::Code::Unimplemented,
Error::FlightSQLPlanning { source } => match source {
flightsql::Error::DeserializationTypeKnown { .. }
| flightsql::Error::InvalidUtf8 { .. }
| flightsql::Error::UnsupportedMessageType { .. } => tonic::Code::InvalidArgument,
flightsql::Error::Flight { source: e } => return tonic::Status::from(e),
fs_err @ flightsql::Error::Arrow { .. } => {
// wrap in Datafusion error to walk source stacks
let df_error = DataFusionError::from(fs_err);
datafusion_error_to_tonic_code(&df_error)
}
flightsql::Error::DataFusion { source } => datafusion_error_to_tonic_code(&source),
},
Self::InternalCreatingTicket { .. } | Self::Optimize { .. } => tonic::Code::Internal,
};
@ -172,7 +173,105 @@ impl Error {
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>;
/// Concrete implementation of the gRPC Arrow Flight Service API
/// Concrete implementation of the IOx client protocol, implemented as
/// a gRPC [Arrow Flight] Service API
///
/// # Tickets
///
/// Creating and serializing the `Ticket` structure used in IOx Arrow
/// Flight API is handled by [`IoxGetRequest`]. See that for more
/// details.
///
/// # Native IOx API ad-hoc query
///
/// To run a query with the native IOx API, a client needs to
///
/// 1. Encode the query string as a `Ticket` (see [`IoxGetRequest`]).
///
/// 2. Call the `DoGet` method with the `Ticket`,
///
/// 2. Recieve a stream of data encoded as [`FlightData`]
///
/// ```text
/// .───────.
/// ╔═══════════╗ ( )
/// ║ ║ │`───────'│
/// ║ Client ║ │ IOx │
/// ║ ║ │.───────.│
/// ║ ║ ( )
/// ╚═══════════╝ `───────'
/// ┃ Creates a ┃
/// 1 ┃ Ticket ┃
/// ┃ ┃
/// ┃ ┃
/// 2 ┃ DoGet(Ticket) ┃
/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ ┃
/// ┃ Stream of FightData ┃
/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ```
///
/// # FlightSQL
///
/// IOx also supports [Arrow FlightSQL]. In addition to `DoGet`,
/// FlightSQL clients call additional Arrow Flight RPC methods such as
/// `GetFlightInfo`, `GetSchema`, `DoPut`, and `DoAction`.
///
/// ## FlightSQL List tables (NOT YET IMPLEMENTED)
///
/// TODO sequence diagram for List Tables
///
/// ## FlightSQL ad-hoc query
///
/// To run an ad-hoc query, via FlightSQL, the client needs to
///
/// 1. Encode the query in a `CommandStatementQuery` FlightSQL
/// structure in a [`FlightDescriptor`]
///
/// 2. Call the `GetFlightInfo` method with the the [`FlightDescriptor`]
///
/// 3. Receive a `Ticket` in the returned [`FlightInfo`]. The Ticket is
/// opaque (uninterpreted) by the client. It contains an
/// [`IoxGetRequest`] with the `CommandStatementQuery` request.
///
/// 4. Calls the `DoGet` method with the `Ticket` from the previous step.
///
/// 5. Recieve a stream of data encoded as [`FlightData`]
///
/// ```text
/// .───────.
/// ╔═══════════╗ ( )
/// ║ ║ │`───────'│
/// ║ FlightSQL ║ │ IOx │
/// ║ Client ║ │.───────.│
/// ║ ║ ( )
/// ╚═══════════╝ `───────'
/// ┃ Creates a ┃
/// 1 ┃ CommandStatementQuery ┃
/// ┃ ┃
/// ┃ ┃
/// ┃ ┃
/// 2 ┃ GetFlightInfo(CommandStatementQuery) ┃
/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ FlightInfo{..Ticket{ ┃
/// ┃ CommandStatementQuery ┃
/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ┃ ┃
/// ┃ ┃
/// ┃ DoGet(Ticket) ┃
/// 4 ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ ┃
/// ┃ Stream of FightData ┃
/// 5 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ┃ ┃
/// ```
///
/// ## FlightSQL Prepared Statement (NOT YET IMPLEMENTED)
///
/// TODO sequence diagram
///
/// [Arrow Flight]: https://arrow.apache.org/docs/format/Flight.html
/// [Arrow FlightSQL]: https://arrow.apache.org/docs/format/FlightSql.html
#[derive(Debug)]
struct FlightService<S>
where
@ -192,7 +291,8 @@ impl<S> FlightService<S>
where
S: QueryNamespaceProvider,
{
async fn run_query(
/// Implementation of the `DoGet` method
async fn run_do_get(
&self,
span_ctx: Option<SpanContext>,
permit: InstrumentedAsyncOwnedSemaphorePermit,
@ -223,6 +323,14 @@ where
.context(PlanningSnafu)?;
(token, plan)
}
RunQuery::FlightSql(msg) => {
let token = db.record_query(&ctx, "flightsql", Box::new(msg.type_url.clone()));
let plan = Planner::new(&ctx)
.flight_sql_do_get(&namespace, db, msg.clone())
.await
.context(PlanningSnafu)?;
(token, plan)
}
};
let output =
@ -249,7 +357,9 @@ where
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
Err(tonic::Status::unimplemented(
"Not yet implemented: get_schema",
))
}
async fn do_get(
@ -279,17 +389,17 @@ where
// Log after we acquire the permit and are about to start execution
let start = Instant::now();
info!(%namespace_name, %query, %trace, "Running SQL via flight do_get");
info!(%namespace_name, %query, %trace, "DoGet request");
let response = self
.run_query(span_ctx, permit, query, namespace_name.to_string())
.run_do_get(span_ctx, permit, query, namespace_name.to_string())
.await;
if let Err(e) = &response {
info!(%namespace_name, %query, %trace, %e, "Error running SQL query");
info!(%namespace_name, %query, %trace, %e, "Error running DoGet");
} else {
let elapsed = Instant::now() - start;
debug!(%namespace_name, %query,%trace, ?elapsed, "Completed SQL query successfully");
debug!(%namespace_name, %query, %trace, ?elapsed, "Completed DoGet request");
}
response
}
@ -298,7 +408,12 @@ where
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let request = request
.into_inner()
.message()
.await?
.context(InvalidHandshakeSnafu)?;
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
@ -311,20 +426,24 @@ where
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
Err(tonic::Status::unimplemented(
"Not yet implemented: list_flights",
))
}
/// Handles requests encoded in the FlightDescriptor
///
/// IOx currently only processes "cmd" type Descriptors (not
/// paths) and attempts to decodes the [`FlightDescriptor::cmd`]
/// bytes as an encoded protobuf message
///
/// Handles `GetFlightInfo` RPC requests. The [`FlightDescriptor`]
/// is treated containing an FlightSQL command, encoded as a binary
/// ProtoBuf message.
///
/// see [`FlightService`] for more details.
async fn get_flight_info(
&self,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
let external_span_ctx: Option<RequestLogContext> = request.extensions().get().cloned();
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let trace = external_span_ctx.format_jaeger();
// look for namespace information in headers
let namespace_name = request
.metadata()
@ -336,111 +455,43 @@ where
})
.ok_or(Error::NoNamespaceHeader)??;
let request = request.into_inner();
let flight_descriptor = request.into_inner();
let cmd = match request.r#type() {
DescriptorType::Cmd => Ok(&request.cmd),
DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")),
DescriptorType::Unknown => Err(Error::unsupported_message_type(
"FlightInfo of unknown type",
)),
}?;
// extract the FlightSQL message
let msg = msg_from_descriptor(flight_descriptor.clone())?;
let message: prost_types::Any =
prost::Message::decode(cmd.as_ref()).context(DeserializationSnafu)?;
let type_url = &msg.type_url;
info!(%namespace_name, %type_url, %trace, "GetFlightInfo request");
let flight_info = self.dispatch(&namespace_name, request, message).await?;
Ok(tonic::Response::new(flight_info))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
impl<S> FlightService<S>
where
S: QueryNamespaceProvider,
{
/// Given a successfully decoded protobuf *Any* message, handles
/// recognized messages (e.g those defined by FlightSQL) and
/// creates the appropriate FlightData response
///
/// Arguments
///
/// namespace_name: is the target namespace of the request
///
/// flight_descriptor: is the descriptor sent in the request (included in response)
///
/// msg is the `cmd` field of the flight descriptor decoded as a protobuf message
async fn dispatch(
&self,
namespace_name: &str,
flight_descriptor: FlightDescriptor,
msg: prost_types::Any,
) -> Result<FlightInfo> {
fn try_unpack<T: ProstMessageExt>(msg: &prost_types::Any) -> Result<Option<T>> {
// Does the type URL match?
if T::type_url() != msg.type_url {
return Ok(None);
}
// type matched, so try and decode
let m = prost::Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu {
type_url: &msg.type_url,
let db = self
.server
.db(&namespace_name, span_ctx.child_span("get namespace"))
.await
.ok_or_else(|| {
tonic::Status::not_found(format!("Unknown namespace: {namespace_name}"))
})?;
Ok(Some(m))
}
// FlightSQL CommandStatementQuery
let (schema, ticket) = if let Some(cmd) = try_unpack::<CommandStatementQuery>(&msg)? {
let CommandStatementQuery { query } = cmd;
debug!(%namespace_name, %query, "Handling FlightSQL CommandStatementQuery");
let ctx = db.new_query_context(span_ctx);
let schema = Planner::new(&ctx)
.flight_sql_get_flight_info(&namespace_name, msg.clone())
.await
.context(PlanningSnafu);
// TODO is supposed to return a schema -- if clients
// actually expect the schema we'll have to plan the query
// here.
let schema = vec![];
// Create a ticket that can be passed to do_get to run the query
let ticket = IoxGetRequest::new(namespace_name, RunQuery::Sql(query))
.try_encode()
.context(InternalCreatingTicketSnafu)?;
(schema, ticket)
if let Err(e) = &schema {
info!(%namespace_name, %type_url, %trace, %e, "Error running GetFlightInfo");
} else {
return Err(Error::unsupported_message_type(format!(
"Unsupported cmd message: {}",
msg.type_url
)));
debug!(%namespace_name, %type_url, %trace, "Completed GetFlightInfo request");
};
let schema = schema?;
// form the response
// Form the response ticket (that the client will pass back to DoGet)
let ticket = IoxGetRequest::new(&namespace_name, RunQuery::FlightSql(msg))
.try_encode()
.context(InternalCreatingTicketSnafu)?;
// Arrow says "set to -1 if not known
// Flight says "Set these to -1 if unknown."
//
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L274-L276
let total_records = -1;
let total_bytes = -1;
@ -454,13 +505,65 @@ where
location: vec![],
}];
Ok(FlightInfo {
schema: schema.into(),
let flight_info = FlightInfo {
schema,
// return descriptor we were passed
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records,
total_bytes,
})
};
Ok(tonic::Response::new(flight_info))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented: do_put"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: do_action",
))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: list_actions",
))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: do_exchange",
))
}
}
/// Extracts an encoded Protobuf message from a [`FlightDescriptor`],
/// as used in FlightSQL.
fn msg_from_descriptor(flight_descriptor: FlightDescriptor) -> Result<Any> {
match flight_descriptor.r#type() {
DescriptorType::Cmd => {
let msg: Any = Message::decode(flight_descriptor.cmd).context(DeserializationSnafu)?;
Ok(msg)
}
DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")),
DescriptorType::Unknown => Err(Error::unsupported_message_type(
"FlightInfo of unknown type",
)),
}
}

View File

@ -1,11 +1,11 @@
//! Ticket handling for the native IOx Flight API
use arrow_flight::Ticket;
use arrow_flight::{sql::Any, Ticket};
use bytes::Bytes;
use generated_types::influxdata::iox::querier::v1 as proto;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use observability_deps::tracing::trace;
use prost::Message;
use prost::{DecodeError, Message};
use serde::Deserialize;
use snafu::Snafu;
use std::fmt::{Debug, Display, Formatter};
@ -17,24 +17,27 @@ pub enum Error {
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Flight requests to the IOx Flight DoGet endpoint contain a
/// serialized `Ticket` which describes the request.
/// This is the structure of the opaque tickets` used for requests to
/// IOx Flight DoGet endpoint
///
/// This structure encapsulates the deserialization (and eventual
/// serializing) logic for these requests
#[derive(Debug, PartialEq, Clone, Eq)]
/// This structure encapsulates the deserialization and serializion
/// logic for these requests
#[derive(Debug, PartialEq, Clone)]
pub struct IoxGetRequest {
namespace_name: String,
query: RunQuery,
}
#[derive(Debug, PartialEq, Clone, Eq)]
#[derive(Debug, PartialEq, Clone)]
pub enum RunQuery {
/// Unparameterized SQL query
Sql(String),
/// InfluxQL
InfluxQL(String),
// Coming Soon (Prepared Statement support)
/// Execute a FlightSQL command. The payload is an encoded
/// FlightSql Command*. message that was received at the
/// get_flight_info endpoint
FlightSql(Any),
}
impl Display for RunQuery {
@ -42,6 +45,7 @@ impl Display for RunQuery {
match self {
Self::Sql(s) => Display::fmt(s, f),
Self::InfluxQL(s) => Display::fmt(s, f),
Self::FlightSql(s) => write!(f, "FlightSql({})", s.type_url),
}
}
}
@ -58,11 +62,11 @@ impl IoxGetRequest {
/// try to decode a ReadInfo structure from a Token
pub fn try_decode(ticket: Ticket) -> Result<Self> {
// decode ticket
IoxGetRequest::decode_protobuf(&ticket.ticket)
IoxGetRequest::decode_protobuf(ticket.ticket.clone())
.or_else(|e| {
trace!(%e, ticket=%String::from_utf8_lossy(&ticket.ticket),
"Error decoding ticket as ProtoBuf, trying as JSON");
IoxGetRequest::decode_json(&ticket.ticket)
IoxGetRequest::decode_json(ticket.ticket.clone())
})
.map_err(|e| {
trace!(%e, "Error decoding ticket as JSON");
@ -82,15 +86,21 @@ impl IoxGetRequest {
namespace_name,
sql_query,
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
},
RunQuery::InfluxQL(influxql) => proto::ReadInfo {
namespace_name,
// field name is misleading
sql_query: influxql,
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
},
RunQuery::FlightSql(flightsql_command) => proto::ReadInfo {
namespace_name,
sql_query: "".into(),
query_type: QueryType::FlightSqlMessage.into(),
flightsql_command: flightsql_command.encode_to_vec(),
},
RunQuery::InfluxQL(influxql) => {
proto::ReadInfo {
namespace_name,
// field name is misleading
sql_query: influxql,
query_type: QueryType::InfluxQl.into(),
}
}
};
let ticket = read_info.encode_to_vec();
@ -107,7 +117,7 @@ impl IoxGetRequest {
///
/// Go clients are unable to execute InfluxQL queries until the JSON structure is updated
/// accordingly.
fn decode_json(ticket: &[u8]) -> Result<Self, String> {
fn decode_json(ticket: Bytes) -> Result<Self, String> {
let json_str = String::from_utf8(ticket.to_vec()).map_err(|_| "Not UTF8".to_string())?;
#[derive(Deserialize, Debug)]
@ -128,21 +138,45 @@ impl IoxGetRequest {
})
}
fn decode_protobuf(ticket: &[u8]) -> Result<Self, prost::DecodeError> {
let read_info = proto::ReadInfo::decode(Bytes::from(ticket.to_vec()))?;
fn decode_protobuf(ticket: Bytes) -> Result<Self, DecodeError> {
let read_info = proto::ReadInfo::decode(ticket)?;
let query_type = read_info.query_type();
let proto::ReadInfo {
namespace_name,
sql_query,
query_type: _,
flightsql_command,
} = read_info;
Ok(Self {
namespace_name,
query: match query_type {
QueryType::Unspecified | QueryType::Sql => RunQuery::Sql(sql_query),
QueryType::InfluxQl => RunQuery::InfluxQL(sql_query),
QueryType::Unspecified | QueryType::Sql => {
if !flightsql_command.is_empty() {
return Err(DecodeError::new(
"QueryType::Sql contained non empty flightsql_command",
));
}
RunQuery::Sql(sql_query)
}
QueryType::InfluxQl => {
if !flightsql_command.is_empty() {
return Err(DecodeError::new(
"QueryType::InfluxQl contained non empty flightsql_command",
));
}
RunQuery::InfluxQL(sql_query)
}
QueryType::FlightSqlMessage => {
if !sql_query.is_empty() {
return Err(DecodeError::new(
"QueryType::FlightSqlMessage contained non empty sql_query",
));
}
let cmd = prost::Message::decode(flightsql_command.as_ref())?;
RunQuery::FlightSql(cmd)
}
},
})
}
@ -158,6 +192,7 @@ impl IoxGetRequest {
#[cfg(test)]
mod tests {
use arrow_flight::sql::CommandStatementQuery;
use assert_matches::assert_matches;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
@ -192,6 +227,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Unspecified.into(),
flightsql_command: vec![],
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
@ -206,6 +242,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
});
let ri = IoxGetRequest::try_decode(ticket).unwrap();
@ -219,6 +256,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
});
let ri = IoxGetRequest::try_decode(ticket).unwrap();
@ -231,7 +269,8 @@ mod tests {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".into(),
query_type: 3, // not a known query type
query_type: 42, // not a known query type
flightsql_command: vec![],
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
@ -240,6 +279,48 @@ mod tests {
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_sql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_influxql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_flightsql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::FlightSqlMessage.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_error() {
let ticket = Ticket {
@ -279,6 +360,25 @@ mod tests {
assert_eq!(request, roundtripped)
}
#[test]
fn round_trip_flightsql() {
let cmd = Any::pack(&CommandStatementQuery {
query: "select * from foo".into(),
})
.unwrap();
let request = IoxGetRequest {
namespace_name: "foo_blarg".into(),
query: RunQuery::FlightSql(cmd),
};
let ticket = request.clone().try_encode().expect("encoding failed");
let roundtripped = IoxGetRequest::try_decode(ticket).expect("decode failed");
assert_eq!(request, roundtripped)
}
fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket {
Ticket {
ticket: read_info.encode_to_vec().into(),