diff --git a/Cargo.lock b/Cargo.lock index 7a9496b926..37de17a2c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1790,6 +1790,24 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flightsql" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow-flight", + "bytes", + "datafusion", + "futures", + "iox_query", + "observability_deps", + "prost 0.11.6", + "snafu", + "tokio", + "tonic", + "workspace-hack", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -5103,8 +5121,11 @@ dependencies = [ name = "service_common" version = "0.1.0" dependencies = [ + "arrow-flight", "async-trait", + "bytes", "datafusion", + "flightsql", "iox_query", "metric", "parking_lot 0.12.1", @@ -5141,13 +5162,13 @@ dependencies = [ "bytes", "data_types", "datafusion", + "flightsql", "futures", "generated_types", "iox_query", "metric", "observability_deps", "prost 0.11.6", - "prost-types 0.11.6", "serde", "serde_json", "service_common", diff --git a/Cargo.toml b/Cargo.toml index 78177c5e2c..d0a786f760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,9 @@ members = [ "datafusion_util", "dml", "executor", + "flightsql", "generated_types", + "garbage_collector", "grpc-binary-logger-proto", "grpc-binary-logger-test-proto", "grpc-binary-logger", @@ -29,7 +31,6 @@ members = [ "ingester2", "iox_catalog", "iox_data_generator", - "garbage_collector", "iox_query", "iox_tests", "iox_time", diff --git a/flightsql/Cargo.toml b/flightsql/Cargo.toml new file mode 100644 index 0000000000..7bd4ffa6d3 --- /dev/null +++ b/flightsql/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "flightsql" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { workspace = true, features = ["prettyprint"] } +arrow-flight = { workspace = true, features = ["flight-sql-experimental"] } +datafusion = { workspace = true } +observability_deps = { path = "../observability_deps" } +iox_query = { path = "../iox_query" } + +# Crates.io dependencies, in alphabetical order +bytes = "1.3" +futures = "0.3" +snafu = "0.7" +prost = "0.11" +tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +tonic = "0.8" +workspace-hack = { path = "../workspace-hack"} \ No newline at end of file diff --git a/flightsql/src/lib.rs b/flightsql/src/lib.rs new file mode 100644 index 0000000000..febf465db4 --- /dev/null +++ b/flightsql/src/lib.rs @@ -0,0 +1,4 @@ +//! InfluxDB IOx implementation of FlightSQL +mod planner; + +pub use planner::{Error, FlightSQLPlanner, Result}; diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs new file mode 100644 index 0000000000..e615eae9a6 --- /dev/null +++ b/flightsql/src/planner.rs @@ -0,0 +1,169 @@ +//! FlightSQL handling +use std::{string::FromUtf8Error, sync::Arc}; + +use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions}; +use arrow_flight::{ + error::FlightError, + sql::{Any, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt}, + IpcMessage, SchemaAsIpc, +}; +use bytes::Bytes; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +use iox_query::{exec::IOxSessionContext, QueryNamespace}; +use observability_deps::tracing::debug; +use prost::Message; +use snafu::{ResultExt, Snafu}; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid protobuf for type_url '{}': {}", type_url, source))] + DeserializationTypeKnown { + type_url: String, + source: prost::DecodeError, + }, + + #[snafu(display("Query was not valid UTF-8: {}", source))] + InvalidUtf8 { source: FromUtf8Error }, + + #[snafu(display("{}", source))] + Flight { source: FlightError }, + + #[snafu(display("{}", source))] + DataFusion { source: DataFusionError }, + + #[snafu(display("{}", source))] + Arrow { source: ArrowError }, + + #[snafu(display("Unsupported FlightSQL message type: {}", description))] + UnsupportedMessageType { description: String }, +} + +pub type Result = std::result::Result; + +impl From for Error { + fn from(value: FlightError) -> Self { + Self::Flight { source: value } + } +} + +impl From for DataFusionError { + fn from(value: Error) -> Self { + match value { + Error::DataFusion { source } => source, + Error::Arrow { source } => DataFusionError::ArrowError(source), + value => DataFusionError::External(Box::new(value)), + } + } +} + +/// Logic for creating plans for various Flight messages against a query database +#[derive(Debug, Default)] +pub struct FlightSQLPlanner {} + +impl FlightSQLPlanner { + pub fn new() -> Self { + Self {} + } + + /// Returns the schema, in Arrow IPC encoded form, for the request in msg. + pub async fn get_flight_info( + namespace_name: impl Into, + msg: Any, + ctx: &IOxSessionContext, + ) -> Result { + let namespace_name = namespace_name.into(); + debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql get_flight_info"); + + match FlightSQLCommand::try_new(&msg)? { + FlightSQLCommand::CommandStatementQuery(query) + | FlightSQLCommand::CommandPreparedStatementQuery(query) => { + Self::get_schema_for_query(&query, ctx).await + } + } + } + + /// Return the schema for the specified query + /// + /// returns: IPC encoded (schema_bytes) for this query + async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result { + // gather real schema, but only + let logical_plan = ctx.plan_sql(query).await.context(DataFusionSnafu)?; + let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref()); + let options = IpcWriteOptions::default(); + + // encode the schema into the correct form + let IpcMessage(schema) = SchemaAsIpc::new(&schema, &options) + .try_into() + .context(ArrowSnafu)?; + + Ok(schema) + } + + /// Returns a plan that computes results requested in msg + pub async fn do_get( + namespace_name: impl Into, + _database: Arc, + msg: Any, + ctx: &IOxSessionContext, + ) -> Result> { + let namespace_name = namespace_name.into(); + debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql plan to run an actual query"); + + match FlightSQLCommand::try_new(&msg)? { + FlightSQLCommand::CommandStatementQuery(query) => { + debug!(%query, "Planning FlightSQL query"); + ctx.prepare_sql(&query).await.context(DataFusionSnafu) + } + FlightSQLCommand::CommandPreparedStatementQuery(query) => { + debug!(%query, "Planning FlightSQL prepared query"); + ctx.prepare_sql(&query).await.context(DataFusionSnafu) + } + } + } +} + +/// Decoded and validated FlightSQL command +#[derive(Debug, Clone)] +enum FlightSQLCommand { + CommandStatementQuery(String), + CommandPreparedStatementQuery(String), +} + +impl FlightSQLCommand { + /// Figure out and decode the specific FlightSQL command in `msg` + fn try_new(msg: &Any) -> Result { + if let Some(decoded_cmd) = try_unpack::(msg)? { + let CommandStatementQuery { query } = decoded_cmd; + Ok(Self::CommandStatementQuery(query)) + } else if let Some(decoded_cmd) = try_unpack::(msg)? { + let CommandPreparedStatementQuery { + prepared_statement_handle, + } = decoded_cmd; + + // handle should be a decoded query + let query = + String::from_utf8(prepared_statement_handle.to_vec()).context(InvalidUtf8Snafu)?; + Ok(Self::CommandPreparedStatementQuery(query)) + } else { + UnsupportedMessageTypeSnafu { + description: &msg.type_url, + } + .fail() + } + } +} + +/// try to unpack the [`arrow_flight::sql::Any`] as type `T`, returning Ok(None) if +/// the type is wrong or Err if an error occurs +fn try_unpack(msg: &Any) -> Result> { + // Does the type URL match? + if T::type_url() != msg.type_url { + return Ok(None); + } + // type matched, so try and decode + let m = Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu { + type_url: &msg.type_url, + })?; + Ok(Some(m)) +} diff --git a/generated_types/protos/influxdata/iox/querier/v1/flight.proto b/generated_types/protos/influxdata/iox/querier/v1/flight.proto index 0424ab7091..7b183e4682 100644 --- a/generated_types/protos/influxdata/iox/querier/v1/flight.proto +++ b/generated_types/protos/influxdata/iox/querier/v1/flight.proto @@ -3,38 +3,49 @@ package influxdata.iox.querier.v1; option go_package = "github.com/influxdata/iox/querier/v1"; /* - * Message definitions for the InfluxDB IOx Flight API + * Message definition for the native InfluxDB IOx Flight API * - * These messages are what is sent to/from an InfluxDB IOx server's - * `DoGet` endpoint as the opaque "Ticket" in Arrow Flight messages. + * ReadInfo is sent to an InfluxDB IOx Querier server's `DoGet` RPC + * method as the opaque "Ticket" in Arrow Flight messages. * - * The bytes for the Tickets are created by encoding these messages - * using the protobuf binary format. + * Tickets are created by encoding these messages using the protobuf + * binary format. * - * Clients can construct these Ticket's directly to avoid making two - * requests to run each query + * IOx clients can construct these Tickets directly to avoid making + * two RPC requests as typically required by Arrow Flight (a + * `GetFlightInfo` followed by a `DoGet`). */ - - -// Request for an IOx querier to execute a query on a user's behalf. message ReadInfo { // Namespace name. string namespace_name = 1; - // query text (either SQL or InfluxQL, depending on query_type) + // Query text (either SQL or InfluxQL, depending on query_type) string sql_query = 2; + // A FlightSQL command payload (serialized protobuf bytes). One of + // the messages defined in the [protobuf definition]. + // + // [protobuf definition]: https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions + bytes flightsql_command = 4; + // The type of query QueryType query_type = 3; enum QueryType { // An unspecified query type. IOx may choose how to interpret sql_query. QUERY_TYPE_UNSPECIFIED = 0; - // SQL query. + + // SQL query. `sql_query` contains a SQL query as text QUERY_TYPE_SQL = 1; - // InfluxQL query. + + // InfluxQL query. `sql_query` contains an InfluxQL query as text QUERY_TYPE_INFLUX_QL = 2; + + // FlightSQL message: `sql_query` is empty, flightsql_command + // contains a serialized FlightSQL message. + QUERY_TYPE_FLIGHT_SQL_MESSAGE = 3; } + } // Message included in the DoGet response from the querier diff --git a/influxdb_iox_client/src/client/flight/mod.rs b/influxdb_iox_client/src/client/flight/mod.rs index 4533e437d2..557b28fa8b 100644 --- a/influxdb_iox_client/src/client/flight/mod.rs +++ b/influxdb_iox_client/src/client/flight/mod.rs @@ -180,6 +180,7 @@ impl Client { namespace_name, sql_query, query_type: QueryType::Sql.into(), + flightsql_command: vec![], }; self.do_get_with_read_info(request).await @@ -196,6 +197,7 @@ impl Client { namespace_name, sql_query: influxql_query, query_type: QueryType::InfluxQl.into(), + flightsql_command: vec![], }; self.do_get_with_read_info(request).await diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 0cddc4f6c6..9e7419037e 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -312,16 +312,20 @@ impl IOxSessionContext { &self.inner } + /// Plan a SQL statement. This assumes that any tables referenced + /// in the SQL have been registered with this context. Use + /// `prepare_sql` to actually execute the query. + pub async fn plan_sql(&self, sql: &str) -> Result { + let ctx = self.child_ctx("plan_sql"); + debug!(text=%sql, "planning SQL query"); + // NOTE can not use ctx.inner.sql() here as it also interprets DDL + ctx.inner.state().create_logical_plan(sql).await + } + /// Prepare a SQL statement for execution. This assumes that any /// tables referenced in the SQL have been registered with this context pub async fn prepare_sql(&self, sql: &str) -> Result> { - let ctx = self.child_ctx("prepare_sql"); - debug!(text=%sql, "planning SQL query"); - - // NOTE can not use ctx.inner.sql() here as it also interprets DDL - #[allow(deprecated)] - let logical_plan = ctx.inner.state().create_logical_plan(sql).await?; - debug!(plan=%logical_plan.display_graphviz(), "logical plan"); + let logical_plan = self.plan_sql(sql).await?; // Make nicer erorrs for unsupported SQL // (By default datafusion returns Internal Error) @@ -347,6 +351,7 @@ impl IOxSessionContext { _ => (), } + let ctx = self.child_ctx("prepare_sql"); ctx.create_physical_plan(&logical_plan).await } diff --git a/service_common/Cargo.toml b/service_common/Cargo.toml index 10dea53510..5d27fb24a4 100644 --- a/service_common/Cargo.toml +++ b/service_common/Cargo.toml @@ -7,8 +7,11 @@ license.workspace = true [dependencies] # In alphabetical order async-trait = "0.1.63" +arrow-flight = { workspace = true, features = ["flight-sql-experimental"] } +bytes = "1.3" datafusion = { workspace = true } iox_query = { path = "../iox_query" } +flightsql = { path = "../flightsql" } metric = { path = "../metric" } parking_lot = "0.12" predicate = { path = "../predicate" } diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index 8f5b439367..cc22c4b87d 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -1,7 +1,10 @@ //! Query planner wrapper for use in IOx services use std::sync::Arc; -use datafusion::physical_plan::ExecutionPlan; +use arrow_flight::sql::Any; +use bytes::Bytes; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; +use flightsql::FlightSQLPlanner; use iox_query::{ exec::IOxSessionContext, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, @@ -17,7 +20,7 @@ use predicate::rpc_predicate::InfluxRpcPredicate; /// /// Query planning was, at time of writing, a single threaded /// affair. In order to avoid tying up the tokio executor that is -/// handling API requests, we plan queries using a separate thread +/// handling API requests, IOx plan queries using a separate thread /// pool. pub struct Planner { /// Executors (whose threadpool to use) @@ -60,6 +63,50 @@ impl Planner { .await } + /// Creates a plan for a `DoGet` FlightSQL message, + /// as described on [`FlightSQLPlanner::do_get`], on a + /// separate threadpool + pub async fn flight_sql_do_get( + &self, + namespace_name: impl Into, + namespace: Arc, + msg: Any, + ) -> Result> + where + N: QueryNamespace + 'static, + { + let namespace_name = namespace_name.into(); + let ctx = self.ctx.child_ctx("planner flight_sql_do_get"); + + self.ctx + .run(async move { + FlightSQLPlanner::do_get(namespace_name, namespace, msg, &ctx) + .await + .map_err(DataFusionError::from) + }) + .await + } + + /// Creates the response for a `GetFlightInfo` FlightSQL message + /// as described on [`FlightSQLPlanner::get_flight_info`], on a + /// separate threadpool. + pub async fn flight_sql_get_flight_info( + &self, + namespace_name: impl Into, + msg: Any, + ) -> Result { + let namespace_name = namespace_name.into(); + let ctx = self.ctx.child_ctx("planner flight_sql_get_flight_info"); + + self.ctx + .run(async move { + FlightSQLPlanner::get_flight_info(namespace_name, msg, &ctx) + .await + .map_err(DataFusionError::from) + }) + .await + } + /// Creates a plan as described on /// [`InfluxRpcPlanner::table_names`], on a separate threadpool pub async fn table_names( diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index 96e641ce2c..cddea09427 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -10,9 +10,9 @@ license.workspace = true arrow_util = { path = "../arrow_util" } data_types = { path = "../data_types" } datafusion = { workspace = true } +flightsql = { path = "../flightsql" } generated_types = { path = "../generated_types" } observability_deps = { path = "../observability_deps" } -prost-types = { version = "0.11", features = ["std"] } iox_query = { path = "../iox_query" } service_common = { path = "../service_common" } trace = { path = "../trace"} diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 9724f2a9fb..7a9a7acca4 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -1,4 +1,5 @@ -//! Implements the InfluxDB IOx Flight API using Arrow Flight and gRPC +//! Implements the InfluxDB IOx Flight API and Arrow FlightSQL, based +//! on Arrow Flight and gRPC. See [`FlightService`] for full detail. mod request; @@ -11,7 +12,7 @@ use arrow_flight::{ error::FlightError, flight_descriptor::DescriptorType, flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer}, - sql::{CommandStatementQuery, ProstMessageExt}, + sql::Any, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; @@ -28,7 +29,7 @@ use observability_deps::tracing::{debug, info, warn}; use prost::Message; use request::{IoxGetRequest, RunQuery}; use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; use tonic::{Request, Response, Streaming}; use trace::{ctx::SpanContext, span::SpanExt}; @@ -51,11 +52,8 @@ pub enum Error { #[snafu(display("Internal creating encoding ticket: {}", source))] InternalCreatingTicket { source: request::Error }, - #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] - InvalidQuery { - query: String, - source: serde_json::Error, - }, + #[snafu(display("Invalid handshake. No payload provided"))] + InvalidHandshake {}, #[snafu(display("Namespace {} not found", namespace_name))] NamespaceNotFound { namespace_name: String }, @@ -89,18 +87,12 @@ pub enum Error { source: service_common::planner::Error, }, - #[snafu(display("Error during protobuf serialization: {}", source))] - Serialization { source: prost::EncodeError }, + #[snafu(display("Error while planning FlightSQL : {}", source))] + FlightSQLPlanning { source: flightsql::Error }, #[snafu(display("Invalid protobuf: {}", source))] Deserialization { source: prost::DecodeError }, - #[snafu(display("Invalid protobuf for type_url'{}': {}", type_url, source))] - DeserializationTypeKnown { - type_url: String, - source: prost::DecodeError, - }, - #[snafu(display("Unsupported message type: {}", description))] UnsupportedMessageType { description: String }, } @@ -116,7 +108,7 @@ impl From for tonic::Status { match err { Error::NamespaceNotFound { .. } | Error::InvalidTicket { .. } - | Error::InvalidQuery { .. } + | Error::InvalidHandshake { .. } // TODO(edd): this should be `debug`. Keeping at info while IOx in early development | Error::InvalidNamespaceName { .. } => info!(e=%err, msg), Error::Query { .. } => info!(e=%err, msg), @@ -124,11 +116,10 @@ impl From for tonic::Status { |Error::NoNamespaceHeader |Error::InvalidNamespaceHeader { .. } | Error::Planning { .. } - | Error::Serialization { .. } | Error::Deserialization { .. } - | Error::DeserializationTypeKnown { .. } | Error::InternalCreatingTicket { .. } | Error::UnsupportedMessageType { .. } + | Error::FlightSQLPlanning { .. } => { warn!(e=%err, msg) } @@ -146,10 +137,8 @@ impl Error { let code = match self { Self::NamespaceNotFound { .. } => tonic::Code::NotFound, Self::InvalidTicket { .. } - | Self::InvalidQuery { .. } - | Self::Serialization { .. } + | Self::InvalidHandshake { .. } | Self::Deserialization { .. } - | Self::DeserializationTypeKnown { .. } | Self::NoNamespaceHeader | Self::InvalidNamespaceHeader { .. } | Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument, @@ -157,6 +146,18 @@ impl Error { datafusion_error_to_tonic_code(&source) } Self::UnsupportedMessageType { .. } => tonic::Code::Unimplemented, + Error::FlightSQLPlanning { source } => match source { + flightsql::Error::DeserializationTypeKnown { .. } + | flightsql::Error::InvalidUtf8 { .. } + | flightsql::Error::UnsupportedMessageType { .. } => tonic::Code::InvalidArgument, + flightsql::Error::Flight { source: e } => return tonic::Status::from(e), + fs_err @ flightsql::Error::Arrow { .. } => { + // wrap in Datafusion error to walk source stacks + let df_error = DataFusionError::from(fs_err); + datafusion_error_to_tonic_code(&df_error) + } + flightsql::Error::DataFusion { source } => datafusion_error_to_tonic_code(&source), + }, Self::InternalCreatingTicket { .. } | Self::Optimize { .. } => tonic::Code::Internal, }; @@ -172,7 +173,105 @@ impl Error { type TonicStream = Pin> + Send + 'static>>; -/// Concrete implementation of the gRPC Arrow Flight Service API +/// Concrete implementation of the IOx client protocol, implemented as +/// a gRPC [Arrow Flight] Service API +/// +/// # Tickets +/// +/// Creating and serializing the `Ticket` structure used in IOx Arrow +/// Flight API is handled by [`IoxGetRequest`]. See that for more +/// details. +/// +/// # Native IOx API ad-hoc query +/// +/// To run a query with the native IOx API, a client needs to +/// +/// 1. Encode the query string as a `Ticket` (see [`IoxGetRequest`]). +/// +/// 2. Call the `DoGet` method with the `Ticket`, +/// +/// 2. Recieve a stream of data encoded as [`FlightData`] +/// +/// ```text +/// .───────. +/// ╔═══════════╗ ( ) +/// ║ ║ │`───────'│ +/// ║ Client ║ │ IOx │ +/// ║ ║ │.───────.│ +/// ║ ║ ( ) +/// ╚═══════════╝ `───────' +/// ┃ Creates a ┃ +/// 1 ┃ Ticket ┃ +/// ┃ ┃ +/// ┃ ┃ +/// 2 ┃ DoGet(Ticket) ┃ +/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃ +/// ┃ ┃ +/// ┃ Stream of FightData ┃ +/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃ +/// ``` +/// +/// # FlightSQL +/// +/// IOx also supports [Arrow FlightSQL]. In addition to `DoGet`, +/// FlightSQL clients call additional Arrow Flight RPC methods such as +/// `GetFlightInfo`, `GetSchema`, `DoPut`, and `DoAction`. +/// +/// ## FlightSQL List tables (NOT YET IMPLEMENTED) +/// +/// TODO sequence diagram for List Tables +/// +/// ## FlightSQL ad-hoc query +/// +/// To run an ad-hoc query, via FlightSQL, the client needs to +/// +/// 1. Encode the query in a `CommandStatementQuery` FlightSQL +/// structure in a [`FlightDescriptor`] +/// +/// 2. Call the `GetFlightInfo` method with the the [`FlightDescriptor`] +/// +/// 3. Receive a `Ticket` in the returned [`FlightInfo`]. The Ticket is +/// opaque (uninterpreted) by the client. It contains an +/// [`IoxGetRequest`] with the `CommandStatementQuery` request. +/// +/// 4. Calls the `DoGet` method with the `Ticket` from the previous step. +/// +/// 5. Recieve a stream of data encoded as [`FlightData`] +/// +/// ```text +/// .───────. +/// ╔═══════════╗ ( ) +/// ║ ║ │`───────'│ +/// ║ FlightSQL ║ │ IOx │ +/// ║ Client ║ │.───────.│ +/// ║ ║ ( ) +/// ╚═══════════╝ `───────' +/// ┃ Creates a ┃ +/// 1 ┃ CommandStatementQuery ┃ +/// ┃ ┃ +/// ┃ ┃ +/// ┃ ┃ +/// 2 ┃ GetFlightInfo(CommandStatementQuery) ┃ +/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃ +/// ┃ FlightInfo{..Ticket{ ┃ +/// ┃ CommandStatementQuery ┃ +/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃ +/// ┃ ┃ +/// ┃ ┃ +/// ┃ DoGet(Ticket) ┃ +/// 4 ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃ +/// ┃ ┃ +/// ┃ Stream of FightData ┃ +/// 5 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃ +/// ┃ ┃ +/// ``` +/// +/// ## FlightSQL Prepared Statement (NOT YET IMPLEMENTED) +/// +/// TODO sequence diagram +/// +/// [Arrow Flight]: https://arrow.apache.org/docs/format/Flight.html +/// [Arrow FlightSQL]: https://arrow.apache.org/docs/format/FlightSql.html #[derive(Debug)] struct FlightService where @@ -192,7 +291,8 @@ impl FlightService where S: QueryNamespaceProvider, { - async fn run_query( + /// Implementation of the `DoGet` method + async fn run_do_get( &self, span_ctx: Option, permit: InstrumentedAsyncOwnedSemaphorePermit, @@ -223,6 +323,14 @@ where .context(PlanningSnafu)?; (token, plan) } + RunQuery::FlightSql(msg) => { + let token = db.record_query(&ctx, "flightsql", Box::new(msg.type_url.clone())); + let plan = Planner::new(&ctx) + .flight_sql_do_get(&namespace, db, msg.clone()) + .await + .context(PlanningSnafu)?; + (token, plan) + } }; let output = @@ -249,7 +357,9 @@ where &self, _request: Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) + Err(tonic::Status::unimplemented( + "Not yet implemented: get_schema", + )) } async fn do_get( @@ -279,17 +389,17 @@ where // Log after we acquire the permit and are about to start execution let start = Instant::now(); - info!(%namespace_name, %query, %trace, "Running SQL via flight do_get"); + info!(%namespace_name, %query, %trace, "DoGet request"); let response = self - .run_query(span_ctx, permit, query, namespace_name.to_string()) + .run_do_get(span_ctx, permit, query, namespace_name.to_string()) .await; if let Err(e) = &response { - info!(%namespace_name, %query, %trace, %e, "Error running SQL query"); + info!(%namespace_name, %query, %trace, %e, "Error running DoGet"); } else { let elapsed = Instant::now() - start; - debug!(%namespace_name, %query,%trace, ?elapsed, "Completed SQL query successfully"); + debug!(%namespace_name, %query, %trace, ?elapsed, "Completed DoGet request"); } response } @@ -298,7 +408,12 @@ where &self, request: Request>, ) -> Result, tonic::Status> { - let request = request.into_inner().message().await?.unwrap(); + let request = request + .into_inner() + .message() + .await? + .context(InvalidHandshakeSnafu)?; + let response = HandshakeResponse { protocol_version: request.protocol_version, payload: request.payload, @@ -311,20 +426,24 @@ where &self, _request: Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) + Err(tonic::Status::unimplemented( + "Not yet implemented: list_flights", + )) } - /// Handles requests encoded in the FlightDescriptor - /// - /// IOx currently only processes "cmd" type Descriptors (not - /// paths) and attempts to decodes the [`FlightDescriptor::cmd`] - /// bytes as an encoded protobuf message - /// + /// Handles `GetFlightInfo` RPC requests. The [`FlightDescriptor`] + /// is treated containing an FlightSQL command, encoded as a binary + /// ProtoBuf message. /// + /// see [`FlightService`] for more details. async fn get_flight_info( &self, request: Request, ) -> Result, tonic::Status> { + let external_span_ctx: Option = request.extensions().get().cloned(); + let span_ctx: Option = request.extensions().get().cloned(); + let trace = external_span_ctx.format_jaeger(); + // look for namespace information in headers let namespace_name = request .metadata() @@ -336,111 +455,43 @@ where }) .ok_or(Error::NoNamespaceHeader)??; - let request = request.into_inner(); + let flight_descriptor = request.into_inner(); - let cmd = match request.r#type() { - DescriptorType::Cmd => Ok(&request.cmd), - DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")), - DescriptorType::Unknown => Err(Error::unsupported_message_type( - "FlightInfo of unknown type", - )), - }?; + // extract the FlightSQL message + let msg = msg_from_descriptor(flight_descriptor.clone())?; - let message: prost_types::Any = - prost::Message::decode(cmd.as_ref()).context(DeserializationSnafu)?; + let type_url = &msg.type_url; + info!(%namespace_name, %type_url, %trace, "GetFlightInfo request"); - let flight_info = self.dispatch(&namespace_name, request, message).await?; - Ok(tonic::Response::new(flight_info)) - } - - async fn do_put( - &self, - _request: Request>, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_action( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn list_actions( - &self, - _request: Request, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - async fn do_exchange( - &self, - _request: Request>, - ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } -} - -impl FlightService -where - S: QueryNamespaceProvider, -{ - /// Given a successfully decoded protobuf *Any* message, handles - /// recognized messages (e.g those defined by FlightSQL) and - /// creates the appropriate FlightData response - /// - /// Arguments - /// - /// namespace_name: is the target namespace of the request - /// - /// flight_descriptor: is the descriptor sent in the request (included in response) - /// - /// msg is the `cmd` field of the flight descriptor decoded as a protobuf message - async fn dispatch( - &self, - namespace_name: &str, - flight_descriptor: FlightDescriptor, - msg: prost_types::Any, - ) -> Result { - fn try_unpack(msg: &prost_types::Any) -> Result> { - // Does the type URL match? - if T::type_url() != msg.type_url { - return Ok(None); - } - // type matched, so try and decode - let m = prost::Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu { - type_url: &msg.type_url, + let db = self + .server + .db(&namespace_name, span_ctx.child_span("get namespace")) + .await + .ok_or_else(|| { + tonic::Status::not_found(format!("Unknown namespace: {namespace_name}")) })?; - Ok(Some(m)) - } - // FlightSQL CommandStatementQuery - let (schema, ticket) = if let Some(cmd) = try_unpack::(&msg)? { - let CommandStatementQuery { query } = cmd; - debug!(%namespace_name, %query, "Handling FlightSQL CommandStatementQuery"); + let ctx = db.new_query_context(span_ctx); + let schema = Planner::new(&ctx) + .flight_sql_get_flight_info(&namespace_name, msg.clone()) + .await + .context(PlanningSnafu); - // TODO is supposed to return a schema -- if clients - // actually expect the schema we'll have to plan the query - // here. - let schema = vec![]; - - // Create a ticket that can be passed to do_get to run the query - let ticket = IoxGetRequest::new(namespace_name, RunQuery::Sql(query)) - .try_encode() - .context(InternalCreatingTicketSnafu)?; - - (schema, ticket) + if let Err(e) = &schema { + info!(%namespace_name, %type_url, %trace, %e, "Error running GetFlightInfo"); } else { - return Err(Error::unsupported_message_type(format!( - "Unsupported cmd message: {}", - msg.type_url - ))); + debug!(%namespace_name, %type_url, %trace, "Completed GetFlightInfo request"); }; + let schema = schema?; - // form the response + // Form the response ticket (that the client will pass back to DoGet) + let ticket = IoxGetRequest::new(&namespace_name, RunQuery::FlightSql(msg)) + .try_encode() + .context(InternalCreatingTicketSnafu)?; - // Arrow says "set to -1 if not known + // Flight says "Set these to -1 if unknown." + // + // https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L274-L276 let total_records = -1; let total_bytes = -1; @@ -454,13 +505,65 @@ where location: vec![], }]; - Ok(FlightInfo { - schema: schema.into(), + let flight_info = FlightInfo { + schema, + // return descriptor we were passed flight_descriptor: Some(flight_descriptor), endpoint, total_records, total_bytes, - }) + }; + + Ok(tonic::Response::new(flight_info)) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("Not yet implemented: do_put")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented( + "Not yet implemented: do_action", + )) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented( + "Not yet implemented: list_actions", + )) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented( + "Not yet implemented: do_exchange", + )) + } +} + +/// Extracts an encoded Protobuf message from a [`FlightDescriptor`], +/// as used in FlightSQL. +fn msg_from_descriptor(flight_descriptor: FlightDescriptor) -> Result { + match flight_descriptor.r#type() { + DescriptorType::Cmd => { + let msg: Any = Message::decode(flight_descriptor.cmd).context(DeserializationSnafu)?; + Ok(msg) + } + DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")), + DescriptorType::Unknown => Err(Error::unsupported_message_type( + "FlightInfo of unknown type", + )), } } diff --git a/service_grpc_flight/src/request.rs b/service_grpc_flight/src/request.rs index 86b02ad398..043938a6b5 100644 --- a/service_grpc_flight/src/request.rs +++ b/service_grpc_flight/src/request.rs @@ -1,11 +1,11 @@ //! Ticket handling for the native IOx Flight API -use arrow_flight::Ticket; +use arrow_flight::{sql::Any, Ticket}; use bytes::Bytes; use generated_types::influxdata::iox::querier::v1 as proto; use generated_types::influxdata::iox::querier::v1::read_info::QueryType; use observability_deps::tracing::trace; -use prost::Message; +use prost::{DecodeError, Message}; use serde::Deserialize; use snafu::Snafu; use std::fmt::{Debug, Display, Formatter}; @@ -17,24 +17,27 @@ pub enum Error { } pub type Result = std::result::Result; -/// Flight requests to the IOx Flight DoGet endpoint contain a -/// serialized `Ticket` which describes the request. +/// This is the structure of the opaque tickets` used for requests to +/// IOx Flight DoGet endpoint /// -/// This structure encapsulates the deserialization (and eventual -/// serializing) logic for these requests -#[derive(Debug, PartialEq, Clone, Eq)] +/// This structure encapsulates the deserialization and serializion +/// logic for these requests +#[derive(Debug, PartialEq, Clone)] pub struct IoxGetRequest { namespace_name: String, query: RunQuery, } -#[derive(Debug, PartialEq, Clone, Eq)] +#[derive(Debug, PartialEq, Clone)] pub enum RunQuery { /// Unparameterized SQL query Sql(String), /// InfluxQL InfluxQL(String), - // Coming Soon (Prepared Statement support) + /// Execute a FlightSQL command. The payload is an encoded + /// FlightSql Command*. message that was received at the + /// get_flight_info endpoint + FlightSql(Any), } impl Display for RunQuery { @@ -42,6 +45,7 @@ impl Display for RunQuery { match self { Self::Sql(s) => Display::fmt(s, f), Self::InfluxQL(s) => Display::fmt(s, f), + Self::FlightSql(s) => write!(f, "FlightSql({})", s.type_url), } } } @@ -58,11 +62,11 @@ impl IoxGetRequest { /// try to decode a ReadInfo structure from a Token pub fn try_decode(ticket: Ticket) -> Result { // decode ticket - IoxGetRequest::decode_protobuf(&ticket.ticket) + IoxGetRequest::decode_protobuf(ticket.ticket.clone()) .or_else(|e| { trace!(%e, ticket=%String::from_utf8_lossy(&ticket.ticket), "Error decoding ticket as ProtoBuf, trying as JSON"); - IoxGetRequest::decode_json(&ticket.ticket) + IoxGetRequest::decode_json(ticket.ticket.clone()) }) .map_err(|e| { trace!(%e, "Error decoding ticket as JSON"); @@ -82,15 +86,21 @@ impl IoxGetRequest { namespace_name, sql_query, query_type: QueryType::Sql.into(), + flightsql_command: vec![], + }, + RunQuery::InfluxQL(influxql) => proto::ReadInfo { + namespace_name, + // field name is misleading + sql_query: influxql, + query_type: QueryType::InfluxQl.into(), + flightsql_command: vec![], + }, + RunQuery::FlightSql(flightsql_command) => proto::ReadInfo { + namespace_name, + sql_query: "".into(), + query_type: QueryType::FlightSqlMessage.into(), + flightsql_command: flightsql_command.encode_to_vec(), }, - RunQuery::InfluxQL(influxql) => { - proto::ReadInfo { - namespace_name, - // field name is misleading - sql_query: influxql, - query_type: QueryType::InfluxQl.into(), - } - } }; let ticket = read_info.encode_to_vec(); @@ -107,7 +117,7 @@ impl IoxGetRequest { /// /// Go clients are unable to execute InfluxQL queries until the JSON structure is updated /// accordingly. - fn decode_json(ticket: &[u8]) -> Result { + fn decode_json(ticket: Bytes) -> Result { let json_str = String::from_utf8(ticket.to_vec()).map_err(|_| "Not UTF8".to_string())?; #[derive(Deserialize, Debug)] @@ -128,21 +138,45 @@ impl IoxGetRequest { }) } - fn decode_protobuf(ticket: &[u8]) -> Result { - let read_info = proto::ReadInfo::decode(Bytes::from(ticket.to_vec()))?; + fn decode_protobuf(ticket: Bytes) -> Result { + let read_info = proto::ReadInfo::decode(ticket)?; let query_type = read_info.query_type(); let proto::ReadInfo { namespace_name, sql_query, query_type: _, + flightsql_command, } = read_info; Ok(Self { namespace_name, query: match query_type { - QueryType::Unspecified | QueryType::Sql => RunQuery::Sql(sql_query), - QueryType::InfluxQl => RunQuery::InfluxQL(sql_query), + QueryType::Unspecified | QueryType::Sql => { + if !flightsql_command.is_empty() { + return Err(DecodeError::new( + "QueryType::Sql contained non empty flightsql_command", + )); + } + RunQuery::Sql(sql_query) + } + QueryType::InfluxQl => { + if !flightsql_command.is_empty() { + return Err(DecodeError::new( + "QueryType::InfluxQl contained non empty flightsql_command", + )); + } + RunQuery::InfluxQL(sql_query) + } + QueryType::FlightSqlMessage => { + if !sql_query.is_empty() { + return Err(DecodeError::new( + "QueryType::FlightSqlMessage contained non empty sql_query", + )); + } + let cmd = prost::Message::decode(flightsql_command.as_ref())?; + RunQuery::FlightSql(cmd) + } }, }) } @@ -158,6 +192,7 @@ impl IoxGetRequest { #[cfg(test)] mod tests { + use arrow_flight::sql::CommandStatementQuery; use assert_matches::assert_matches; use generated_types::influxdata::iox::querier::v1::read_info::QueryType; @@ -192,6 +227,7 @@ mod tests { namespace_name: "_".to_string(), sql_query: "SELECT 1".to_string(), query_type: QueryType::Unspecified.into(), + flightsql_command: vec![], }); // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL @@ -206,6 +242,7 @@ mod tests { namespace_name: "_".to_string(), sql_query: "SELECT 1".to_string(), query_type: QueryType::Sql.into(), + flightsql_command: vec![], }); let ri = IoxGetRequest::try_decode(ticket).unwrap(); @@ -219,6 +256,7 @@ mod tests { namespace_name: "_".to_string(), sql_query: "SELECT 1".to_string(), query_type: QueryType::InfluxQl.into(), + flightsql_command: vec![], }); let ri = IoxGetRequest::try_decode(ticket).unwrap(); @@ -231,7 +269,8 @@ mod tests { let ticket = make_proto_ticket(&proto::ReadInfo { namespace_name: "_".to_string(), sql_query: "SELECT 1".into(), - query_type: 3, // not a known query type + query_type: 42, // not a known query type + flightsql_command: vec![], }); // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL @@ -240,6 +279,48 @@ mod tests { assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1")); } + #[test] + fn proto_ticket_decoding_sql_too_many_fields() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::Sql.into(), + // can't have both sql_query and flightsql + flightsql_command: vec![1, 2, 3], + }); + + let e = IoxGetRequest::try_decode(ticket).unwrap_err(); + assert_matches!(e, Error::Invalid); + } + + #[test] + fn proto_ticket_decoding_influxql_too_many_fields() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::InfluxQl.into(), + // can't have both sql_query and flightsql + flightsql_command: vec![1, 2, 3], + }); + + let e = IoxGetRequest::try_decode(ticket).unwrap_err(); + assert_matches!(e, Error::Invalid); + } + + #[test] + fn proto_ticket_decoding_flightsql_too_many_fields() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::FlightSqlMessage.into(), + // can't have both sql_query and flightsql + flightsql_command: vec![1, 2, 3], + }); + + let e = IoxGetRequest::try_decode(ticket).unwrap_err(); + assert_matches!(e, Error::Invalid); + } + #[test] fn proto_ticket_decoding_error() { let ticket = Ticket { @@ -279,6 +360,25 @@ mod tests { assert_eq!(request, roundtripped) } + #[test] + fn round_trip_flightsql() { + let cmd = Any::pack(&CommandStatementQuery { + query: "select * from foo".into(), + }) + .unwrap(); + + let request = IoxGetRequest { + namespace_name: "foo_blarg".into(), + query: RunQuery::FlightSql(cmd), + }; + + let ticket = request.clone().try_encode().expect("encoding failed"); + + let roundtripped = IoxGetRequest::try_decode(ticket).expect("decode failed"); + + assert_eq!(request, roundtripped) + } + fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket { Ticket { ticket: read_info.encode_to_vec().into(),