From 78aba66ca286655c9a664a50e569d9029610692c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 15 Dec 2022 09:10:58 -0500 Subject: [PATCH] refactor: Improve Flight API server side code and comments (#6395) * refactor: Improve Flight API server side code and comments * refactor: revert &str signature in FlightService::run_query --- service_grpc_flight/src/lib.rs | 202 +++---------------------- service_grpc_flight/src/request.rs | 227 +++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+), 179 deletions(-) create mode 100644 service_grpc_flight/src/request.rs diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 0dadd33d1f..a602ea5abb 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -1,4 +1,6 @@ -//! Implements the native gRPC IOx query API using Arrow Flight +//! Implements the InfluxDB IOx Flight API using Arrow Flight and gRPC + +mod request; use arrow::error::ArrowError; use arrow_flight::{ @@ -9,24 +11,21 @@ use arrow_flight::{ use arrow_util::optimize::{ prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response, }; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use data_types::NamespaceNameError; use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::querier::v1 as proto; -use generated_types::influxdata::iox::querier::v1::read_info::QueryType; use iox_query::{ exec::{ExecutionContextProvider, IOxSessionContext}, QueryCompletedToken, QueryNamespace, }; use observability_deps::tracing::{debug, info, warn}; use pin_project::{pin_project, pinned_drop}; -use prost::Message; -use serde::Deserialize; +use request::{IoxGetRequest, RunQuery}; use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use snafu::{ResultExt, Snafu}; -use std::fmt::{Display, Formatter}; -use std::{fmt, fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; +use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant}; use tokio::task::JoinHandle; use tonic::{Request, Response, Streaming}; use trace::{ctx::SpanContext, span::SpanExt}; @@ -36,11 +35,8 @@ use tracker::InstrumentedAsyncOwnedSemaphorePermit; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Invalid ticket. Error: {:?}", source))] - InvalidTicket { source: prost::DecodeError }, - - #[snafu(display("Invalid JSON ticket. Error: {:?}", source))] - InvalidJsonTicket { source: std::string::FromUtf8Error }, + #[snafu(display("Invalid ticket. Error: {}", source))] + InvalidTicket { source: request::Error }, #[snafu(display("Invalid query, could not parse '{}': {}", query, source))] InvalidQuery { @@ -87,7 +83,6 @@ impl From for tonic::Status { match err { Error::NamespaceNotFound { .. } | Error::InvalidTicket { .. } - | Error::InvalidJsonTicket { .. } | Error::InvalidQuery { .. } // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development | Error::InvalidNamespaceName { .. } => info!(e=%err, msg), @@ -108,7 +103,6 @@ impl Error { let code = match self { Self::NamespaceNotFound { .. } => tonic::Code::NotFound, Self::InvalidTicket { .. } - | Self::InvalidJsonTicket { .. } | Self::InvalidQuery { .. } | Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument, Self::Planning { source, .. } | Self::Query { source, .. } => { @@ -123,68 +117,6 @@ impl Error { type TonicStream = Pin> + Send + Sync + 'static>>; -#[derive(Deserialize, Debug)] -/// Body of the `Ticket` serialized and sent to the do_get endpoint. -struct ReadInfo { - namespace_name: String, - query: Query, -} - -#[derive(Deserialize, Debug, Clone)] -enum Query { - Sql(String), - InfluxQL(String), -} - -impl Display for Query { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Sql(s) => fmt::Display::fmt(s, f), - Self::InfluxQL(s) => fmt::Display::fmt(s, f), - } - } -} - -impl ReadInfo { - /// The Go clients still use JSON tickets. See: - /// - /// - - /// - - /// - /// Go clients are unable to execute InfluxQL queries until the JSON structure is updated - /// accordingly. - fn decode_json(ticket: &[u8]) -> Result { - let json_str = String::from_utf8(ticket.to_vec()).context(InvalidJsonTicketSnafu {})?; - - #[derive(Deserialize, Debug)] - struct ReadInfoJson { - namespace_name: String, - sql_query: String, - } - - let read_info: ReadInfoJson = - serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?; - - Ok(Self { - namespace_name: read_info.namespace_name, - query: Query::Sql(read_info.sql_query), // JSON is always SQL - }) - } - - fn decode_protobuf(ticket: &[u8]) -> Result { - let read_info = - proto::ReadInfo::decode(Bytes::from(ticket.to_vec())).context(InvalidTicketSnafu {})?; - - Ok(Self { - namespace_name: read_info.namespace_name.clone(), - query: match read_info.query_type() { - QueryType::Unspecified | QueryType::Sql => Query::Sql(read_info.sql_query), - QueryType::InfluxQl => Query::InfluxQL(read_info.sql_query), - }, - }) - } -} - /// Concrete implementation of the gRPC Arrow Flight Service API #[derive(Debug)] struct FlightService @@ -209,7 +141,7 @@ where &self, span_ctx: Option, permit: InstrumentedAsyncOwnedSemaphorePermit, - query: Query, + query: &RunQuery, namespace: String, ) -> Result>, tonic::Status> { let db = self @@ -220,7 +152,7 @@ where let ctx = db.new_query_context(span_ctx); let (query_completed_token, physical_plan) = match query { - Query::Sql(sql_query) => { + RunQuery::Sql(sql_query) => { let token = db.record_query(&ctx, "sql", Box::new(sql_query.clone())); let plan = Planner::new(&ctx) .sql(sql_query) @@ -228,7 +160,7 @@ where .context(PlanningSnafu)?; (token, plan) } - Query::InfluxQL(sql_query) => { + RunQuery::InfluxQL(sql_query) => { let token = db.record_query(&ctx, "influxql", Box::new(sql_query.clone())); let plan = Planner::new(&ctx) .influxql(db, sql_query) @@ -274,19 +206,16 @@ where let span_ctx: Option = request.extensions().get().cloned(); let ticket = request.into_inner(); - // decode ticket - let read_info = ReadInfo::decode_protobuf(&ticket.ticket).or_else(|_e| { - // try json - ReadInfo::decode_json(&ticket.ticket) - }); + // attempt to decode ticket + let request = IoxGetRequest::try_new(ticket).context(InvalidTicketSnafu); - if let Err(e) = &read_info { - info!(%e, "Error decoding namespace and SQL query name from flight ticket"); + if let Err(e) = &request { + info!(%e, "Error decoding Flight API ticket"); }; - let ReadInfo { - namespace_name, - query: sql_query, - } = read_info?; + + let request = request?; + let namespace_name = request.namespace_name(); + let query = request.query(); let permit = self .server @@ -295,17 +224,17 @@ where // Log after we acquire the permit and are about to start execution let start = Instant::now(); - info!(%namespace_name, %sql_query, %trace, "Running SQL via flight do_get"); + info!(%namespace_name, %query, %trace, "Running SQL via flight do_get"); let response = self - .run_query(span_ctx, permit, sql_query.clone(), namespace_name.clone()) + .run_query(span_ctx, permit, query, namespace_name.to_string()) .await; if let Err(e) = &response { - info!(%namespace_name, %sql_query, %trace, %e, "Error running SQL query"); + info!(%namespace_name, %query, %trace, %e, "Error running SQL query"); } else { let elapsed = Instant::now() - start; - debug!(%namespace_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully"); + debug!(%namespace_name, %query,%trace, ?elapsed, "Completed SQL query successfully"); } response } @@ -512,98 +441,13 @@ impl Stream for GetStream { #[cfg(test)] mod tests { - use assert_matches::assert_matches; use futures::Future; - use generated_types::influxdata::iox::querier::v1::read_info::QueryType; use metric::{Attributes, Metric, U64Gauge}; use service_common::test_util::TestDatabaseStore; use tokio::pin; use super::*; - #[test] - fn json_ticket_decoding() { - // The Go clients still use JSON tickets. See: - // - // - - // - assert_eq!(query, "SELECT 1;")); - } - - #[test] - fn test_read_info_decoding() { - let mut buf = Vec::with_capacity(1024); - proto::ReadInfo::encode( - &proto::ReadInfo { - namespace_name: "_".to_string(), - sql_query: "SELECT 1".to_string(), - query_type: QueryType::Unspecified.into(), - }, - &mut buf, - ) - .unwrap(); - - let ri = ReadInfo::decode_protobuf(&buf).unwrap(); - assert_eq!(ri.namespace_name, "_"); - assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1")); - - let mut buf = Vec::with_capacity(1024); - proto::ReadInfo::encode( - &proto::ReadInfo { - namespace_name: "_".to_string(), - sql_query: "SELECT 1".to_string(), - query_type: QueryType::Sql.into(), - }, - &mut buf, - ) - .unwrap(); - - let ri = ReadInfo::decode_protobuf(&buf).unwrap(); - assert_eq!(ri.namespace_name, "_"); - assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1")); - - let mut buf = Vec::with_capacity(1024); - proto::ReadInfo::encode( - &proto::ReadInfo { - namespace_name: "_".to_string(), - sql_query: "SELECT 1".to_string(), - query_type: QueryType::InfluxQl.into(), - }, - &mut buf, - ) - .unwrap(); - - let ri = ReadInfo::decode_protobuf(&buf).unwrap(); - assert_eq!(ri.namespace_name, "_"); - assert_matches!(ri.query, Query::InfluxQL(query) => assert_eq!(query, "SELECT 1")); - - // Fallible - let mut buf = Vec::with_capacity(1024); - proto::ReadInfo::encode( - &proto::ReadInfo { - namespace_name: "_".to_string(), - sql_query: "SELECT 1".into(), - query_type: 3, - }, - &mut buf, - ) - .unwrap(); - - // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL - let ri = ReadInfo::decode_protobuf(&buf).unwrap(); - assert_eq!(ri.namespace_name, "_"); - assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1")); - } - #[tokio::test] async fn test_query_semaphore() { let semaphore_size = 2; diff --git a/service_grpc_flight/src/request.rs b/service_grpc_flight/src/request.rs new file mode 100644 index 0000000000..9faa551fe3 --- /dev/null +++ b/service_grpc_flight/src/request.rs @@ -0,0 +1,227 @@ +//! Ticket handling for the native IOx Flight API + +use arrow_flight::Ticket; +use bytes::Bytes; +use generated_types::influxdata::iox::querier::v1 as proto; +use generated_types::influxdata::iox::querier::v1::read_info::QueryType; +use observability_deps::tracing::trace; +use prost::Message; +use serde::Deserialize; +use snafu::Snafu; +use std::fmt::{Debug, Display, Formatter}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Invalid ticket"))] + Invalid, +} +pub type Result = std::result::Result; + +/// Flight requests to the IOx Flight DoGet endpoint contain a +/// serialized `Ticket` which describes the request. +/// +/// This structure encapsulates the deserialization (and eventual +/// serializing) logic for these requests +#[derive(Deserialize, Debug)] +pub struct IoxGetRequest { + namespace_name: String, + query: RunQuery, +} + +#[derive(Deserialize, Debug, Clone)] +pub enum RunQuery { + /// Unparameterized SQL query + Sql(String), + /// InfluxQL + InfluxQL(String), + // Coming Soon (Prepared Statement support) +} + +impl Display for RunQuery { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Sql(s) => Display::fmt(s, f), + Self::InfluxQL(s) => Display::fmt(s, f), + } + } +} + +impl IoxGetRequest { + /// try to decode a ReadInfo structure from a Token + pub fn try_new(ticket: Ticket) -> Result { + // decode ticket + IoxGetRequest::decode_protobuf(&ticket.ticket) + .or_else(|e| { + trace!(%e, ticket=%String::from_utf8_lossy(&ticket.ticket), + "Error decoding ticket as ProtoBuf, trying as JSON"); + IoxGetRequest::decode_json(&ticket.ticket) + }) + .map_err(|e| { + trace!(%e, "Error decoding ticket as JSON"); + Error::Invalid + }) + } + + /// The Go clients still use an older form of ticket encoding, JSON tickets + /// + /// - + /// - + /// + /// Go clients are unable to execute InfluxQL queries until the JSON structure is updated + /// accordingly. + fn decode_json(ticket: &[u8]) -> Result { + let json_str = String::from_utf8(ticket.to_vec()).map_err(|_| "Not UTF8".to_string())?; + + #[derive(Deserialize, Debug)] + struct ReadInfoJson { + namespace_name: String, + sql_query: String, + } + + let ReadInfoJson { + namespace_name, + sql_query, + } = serde_json::from_str(&json_str).map_err(|e| format!("JSON parse error: {}", e))?; + + Ok(Self { + namespace_name, + /// Old JSON format is always SQL + query: RunQuery::Sql(sql_query), + }) + } + + fn decode_protobuf(ticket: &[u8]) -> Result { + let read_info = proto::ReadInfo::decode(Bytes::from(ticket.to_vec()))?; + + let query_type = read_info.query_type(); + let proto::ReadInfo { + namespace_name, + sql_query, + query_type: _, + } = read_info; + + Ok(Self { + namespace_name, + query: match query_type { + QueryType::Unspecified | QueryType::Sql => RunQuery::Sql(sql_query), + QueryType::InfluxQl => RunQuery::InfluxQL(sql_query), + }, + }) + } + + pub fn namespace_name(&self) -> &str { + self.namespace_name.as_ref() + } + + pub fn query(&self) -> &RunQuery { + &self.query + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use generated_types::influxdata::iox::querier::v1::read_info::QueryType; + + use super::*; + + #[test] + fn json_ticket_decoding() { + // The Go clients still use JSON tickets. See: + // + // - + // - assert_eq!(query, "SELECT 1;")); + } + + #[test] + fn json_ticket_decoding_error() { + // invalid json (database name rather than namespace name) + let ticket = make_json_ticket(r#"{"database_name": "my_db", "sql_query": "SELECT 1;"}"#); + let e = IoxGetRequest::try_new(ticket).unwrap_err(); + assert_matches!(e, Error::Invalid); + } + + #[test] + fn proto_ticket_decoding_unspecified() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::Unspecified.into(), + }); + + // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL + let ri = IoxGetRequest::try_new(ticket).unwrap(); + assert_eq!(ri.namespace_name, "_"); + assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1")); + } + + #[test] + fn proto_ticket_decoding_sql() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::Sql.into(), + }); + + let ri = IoxGetRequest::try_new(ticket).unwrap(); + assert_eq!(ri.namespace_name, "_"); + assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1")); + } + + #[test] + fn proto_ticket_decoding_influxql() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".to_string(), + query_type: QueryType::InfluxQl.into(), + }); + + let ri = IoxGetRequest::try_new(ticket).unwrap(); + assert_eq!(ri.namespace_name, "_"); + assert_matches!(ri.query, RunQuery::InfluxQL(query) => assert_eq!(query, "SELECT 1")); + } + + #[test] + fn proto_ticket_decoding_too_new() { + let ticket = make_proto_ticket(&proto::ReadInfo { + namespace_name: "_".to_string(), + sql_query: "SELECT 1".into(), + query_type: 3, // not a known query type + }); + + // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL + let ri = IoxGetRequest::try_new(ticket).unwrap(); + assert_eq!(ri.namespace_name, "_"); + assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1")); + } + + #[test] + fn proto_ticket_decoding_error() { + let ticket = Ticket { + ticket: b"invalid ticket".to_vec(), + }; + + // Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL + let e = IoxGetRequest::try_new(ticket).unwrap_err(); + assert_matches!(e, Error::Invalid); + } + + fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket { + let mut buf = Vec::with_capacity(1024); + proto::ReadInfo::encode(read_info, &mut buf).unwrap(); + Ticket { ticket: buf } + } + + fn make_json_ticket(json: &str) -> Ticket { + Ticket { + ticket: json.as_bytes().to_vec(), + } + } +}