refactor: Improve Flight API server side code and comments (#6395)

* refactor: Improve Flight API server side code and comments

* refactor: revert &str signature in FlightService::run_query
pull/24376/head
Andrew Lamb 2022-12-15 09:10:58 -05:00 committed by GitHub
parent a5d693eba2
commit 78aba66ca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 250 additions and 179 deletions

View File

@ -1,4 +1,6 @@
//! Implements the native gRPC IOx query API using Arrow Flight //! Implements the InfluxDB IOx Flight API using Arrow Flight and gRPC
mod request;
use arrow::error::ArrowError; use arrow::error::ArrowError;
use arrow_flight::{ use arrow_flight::{
@ -9,24 +11,21 @@ use arrow_flight::{
use arrow_util::optimize::{ use arrow_util::optimize::{
prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response, prepare_batch_for_flight, prepare_schema_for_flight, split_batch_for_grpc_response,
}; };
use bytes::{Bytes, BytesMut}; use bytes::BytesMut;
use data_types::NamespaceNameError; use data_types::NamespaceNameError;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use futures::{SinkExt, Stream, StreamExt}; use futures::{SinkExt, Stream, StreamExt};
use generated_types::influxdata::iox::querier::v1 as proto; use generated_types::influxdata::iox::querier::v1 as proto;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use iox_query::{ use iox_query::{
exec::{ExecutionContextProvider, IOxSessionContext}, exec::{ExecutionContextProvider, IOxSessionContext},
QueryCompletedToken, QueryNamespace, QueryCompletedToken, QueryNamespace,
}; };
use observability_deps::tracing::{debug, info, warn}; use observability_deps::tracing::{debug, info, warn};
use pin_project::{pin_project, pinned_drop}; use pin_project::{pin_project, pinned_drop};
use prost::Message; use request::{IoxGetRequest, RunQuery};
use serde::Deserialize;
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider}; use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::fmt::{Display, Formatter}; use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use std::{fmt, fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tonic::{Request, Response, Streaming}; use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt}; use trace::{ctx::SpanContext, span::SpanExt};
@ -36,11 +35,8 @@ use tracker::InstrumentedAsyncOwnedSemaphorePermit;
#[allow(clippy::enum_variant_names)] #[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Invalid ticket. Error: {:?}", source))] #[snafu(display("Invalid ticket. Error: {}", source))]
InvalidTicket { source: prost::DecodeError }, InvalidTicket { source: request::Error },
#[snafu(display("Invalid JSON ticket. Error: {:?}", source))]
InvalidJsonTicket { source: std::string::FromUtf8Error },
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))] #[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery { InvalidQuery {
@ -87,7 +83,6 @@ impl From<Error> for tonic::Status {
match err { match err {
Error::NamespaceNotFound { .. } Error::NamespaceNotFound { .. }
| Error::InvalidTicket { .. } | Error::InvalidTicket { .. }
| Error::InvalidJsonTicket { .. }
| Error::InvalidQuery { .. } | Error::InvalidQuery { .. }
// TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development // TODO(edd): this should be `debug`. Keeping at info whilst IOx still in early development
| Error::InvalidNamespaceName { .. } => info!(e=%err, msg), | Error::InvalidNamespaceName { .. } => info!(e=%err, msg),
@ -108,7 +103,6 @@ impl Error {
let code = match self { let code = match self {
Self::NamespaceNotFound { .. } => tonic::Code::NotFound, Self::NamespaceNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. } Self::InvalidTicket { .. }
| Self::InvalidJsonTicket { .. }
| Self::InvalidQuery { .. } | Self::InvalidQuery { .. }
| Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument, | Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument,
Self::Planning { source, .. } | Self::Query { source, .. } => { Self::Planning { source, .. } | Self::Query { source, .. } => {
@ -123,68 +117,6 @@ impl Error {
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>; type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync + 'static>>;
#[derive(Deserialize, Debug)]
/// Body of the `Ticket` serialized and sent to the do_get endpoint.
struct ReadInfo {
namespace_name: String,
query: Query,
}
#[derive(Deserialize, Debug, Clone)]
enum Query {
Sql(String),
InfluxQL(String),
}
impl Display for Query {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sql(s) => fmt::Display::fmt(s, f),
Self::InfluxQL(s) => fmt::Display::fmt(s, f),
}
}
}
impl ReadInfo {
/// The Go clients still use JSON tickets. See:
///
/// - <https://github.com/influxdata/influxdb-iox-client-go/commit/2e7a3b0bd47caab7f1a31a1bbe0ff54aa9486b7b>
/// - <https://github.com/influxdata/influxdb-iox-client-go/commit/52f1a1b8d5bb8cc8dc2fe825f4da630ad0b9167c>
///
/// Go clients are unable to execute InfluxQL queries until the JSON structure is updated
/// accordingly.
fn decode_json(ticket: &[u8]) -> Result<Self> {
let json_str = String::from_utf8(ticket.to_vec()).context(InvalidJsonTicketSnafu {})?;
#[derive(Deserialize, Debug)]
struct ReadInfoJson {
namespace_name: String,
sql_query: String,
}
let read_info: ReadInfoJson =
serde_json::from_str(&json_str).context(InvalidQuerySnafu { query: &json_str })?;
Ok(Self {
namespace_name: read_info.namespace_name,
query: Query::Sql(read_info.sql_query), // JSON is always SQL
})
}
fn decode_protobuf(ticket: &[u8]) -> Result<Self> {
let read_info =
proto::ReadInfo::decode(Bytes::from(ticket.to_vec())).context(InvalidTicketSnafu {})?;
Ok(Self {
namespace_name: read_info.namespace_name.clone(),
query: match read_info.query_type() {
QueryType::Unspecified | QueryType::Sql => Query::Sql(read_info.sql_query),
QueryType::InfluxQl => Query::InfluxQL(read_info.sql_query),
},
})
}
}
/// Concrete implementation of the gRPC Arrow Flight Service API /// Concrete implementation of the gRPC Arrow Flight Service API
#[derive(Debug)] #[derive(Debug)]
struct FlightService<S> struct FlightService<S>
@ -209,7 +141,7 @@ where
&self, &self,
span_ctx: Option<SpanContext>, span_ctx: Option<SpanContext>,
permit: InstrumentedAsyncOwnedSemaphorePermit, permit: InstrumentedAsyncOwnedSemaphorePermit,
query: Query, query: &RunQuery,
namespace: String, namespace: String,
) -> Result<Response<TonicStream<FlightData>>, tonic::Status> { ) -> Result<Response<TonicStream<FlightData>>, tonic::Status> {
let db = self let db = self
@ -220,7 +152,7 @@ where
let ctx = db.new_query_context(span_ctx); let ctx = db.new_query_context(span_ctx);
let (query_completed_token, physical_plan) = match query { let (query_completed_token, physical_plan) = match query {
Query::Sql(sql_query) => { RunQuery::Sql(sql_query) => {
let token = db.record_query(&ctx, "sql", Box::new(sql_query.clone())); let token = db.record_query(&ctx, "sql", Box::new(sql_query.clone()));
let plan = Planner::new(&ctx) let plan = Planner::new(&ctx)
.sql(sql_query) .sql(sql_query)
@ -228,7 +160,7 @@ where
.context(PlanningSnafu)?; .context(PlanningSnafu)?;
(token, plan) (token, plan)
} }
Query::InfluxQL(sql_query) => { RunQuery::InfluxQL(sql_query) => {
let token = db.record_query(&ctx, "influxql", Box::new(sql_query.clone())); let token = db.record_query(&ctx, "influxql", Box::new(sql_query.clone()));
let plan = Planner::new(&ctx) let plan = Planner::new(&ctx)
.influxql(db, sql_query) .influxql(db, sql_query)
@ -274,19 +206,16 @@ where
let span_ctx: Option<SpanContext> = request.extensions().get().cloned(); let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let ticket = request.into_inner(); let ticket = request.into_inner();
// decode ticket // attempt to decode ticket
let read_info = ReadInfo::decode_protobuf(&ticket.ticket).or_else(|_e| { let request = IoxGetRequest::try_new(ticket).context(InvalidTicketSnafu);
// try json
ReadInfo::decode_json(&ticket.ticket)
});
if let Err(e) = &read_info { if let Err(e) = &request {
info!(%e, "Error decoding namespace and SQL query name from flight ticket"); info!(%e, "Error decoding Flight API ticket");
}; };
let ReadInfo {
namespace_name, let request = request?;
query: sql_query, let namespace_name = request.namespace_name();
} = read_info?; let query = request.query();
let permit = self let permit = self
.server .server
@ -295,17 +224,17 @@ where
// Log after we acquire the permit and are about to start execution // Log after we acquire the permit and are about to start execution
let start = Instant::now(); let start = Instant::now();
info!(%namespace_name, %sql_query, %trace, "Running SQL via flight do_get"); info!(%namespace_name, %query, %trace, "Running SQL via flight do_get");
let response = self let response = self
.run_query(span_ctx, permit, sql_query.clone(), namespace_name.clone()) .run_query(span_ctx, permit, query, namespace_name.to_string())
.await; .await;
if let Err(e) = &response { if let Err(e) = &response {
info!(%namespace_name, %sql_query, %trace, %e, "Error running SQL query"); info!(%namespace_name, %query, %trace, %e, "Error running SQL query");
} else { } else {
let elapsed = Instant::now() - start; let elapsed = Instant::now() - start;
debug!(%namespace_name,%sql_query,%trace, ?elapsed, "Completed SQL query successfully"); debug!(%namespace_name, %query,%trace, ?elapsed, "Completed SQL query successfully");
} }
response response
} }
@ -512,98 +441,13 @@ impl Stream for GetStream {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use assert_matches::assert_matches;
use futures::Future; use futures::Future;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use metric::{Attributes, Metric, U64Gauge}; use metric::{Attributes, Metric, U64Gauge};
use service_common::test_util::TestDatabaseStore; use service_common::test_util::TestDatabaseStore;
use tokio::pin; use tokio::pin;
use super::*; use super::*;
#[test]
fn json_ticket_decoding() {
// The Go clients still use JSON tickets. See:
//
// - <https://github.com/influxdata/influxdb-iox-client-go/commit/2e7a3b0bd47caab7f1a31a1bbe0ff54aa9486b7b>
// - <https://github.com/influxdata/influxdb-iox-client-go/commit/52f1a1b8d5bb8cc8dc2fe825f4da630ad0b9167c
//
// Do not change this test without having first changed what the Go clients are sending!
let ticket = Ticket {
ticket: br#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#.to_vec(),
};
let read_info = ReadInfo::decode_json(&ticket.ticket).unwrap();
assert_eq!(read_info.namespace_name, "my_db");
assert_matches!(read_info.query, Query::Sql(query) => assert_eq!(query, "SELECT 1;"));
}
#[test]
fn test_read_info_decoding() {
let mut buf = Vec::with_capacity(1024);
proto::ReadInfo::encode(
&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Unspecified.into(),
},
&mut buf,
)
.unwrap();
let ri = ReadInfo::decode_protobuf(&buf).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1"));
let mut buf = Vec::with_capacity(1024);
proto::ReadInfo::encode(
&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
},
&mut buf,
)
.unwrap();
let ri = ReadInfo::decode_protobuf(&buf).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1"));
let mut buf = Vec::with_capacity(1024);
proto::ReadInfo::encode(
&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
},
&mut buf,
)
.unwrap();
let ri = ReadInfo::decode_protobuf(&buf).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, Query::InfluxQL(query) => assert_eq!(query, "SELECT 1"));
// Fallible
let mut buf = Vec::with_capacity(1024);
proto::ReadInfo::encode(
&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".into(),
query_type: 3,
},
&mut buf,
)
.unwrap();
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
let ri = ReadInfo::decode_protobuf(&buf).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, Query::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[tokio::test] #[tokio::test]
async fn test_query_semaphore() { async fn test_query_semaphore() {
let semaphore_size = 2; let semaphore_size = 2;

View File

@ -0,0 +1,227 @@
//! Ticket handling for the native IOx Flight API
use arrow_flight::Ticket;
use bytes::Bytes;
use generated_types::influxdata::iox::querier::v1 as proto;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use observability_deps::tracing::trace;
use prost::Message;
use serde::Deserialize;
use snafu::Snafu;
use std::fmt::{Debug, Display, Formatter};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid ticket"))]
Invalid,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Flight requests to the IOx Flight DoGet endpoint contain a
/// serialized `Ticket` which describes the request.
///
/// This structure encapsulates the deserialization (and eventual
/// serializing) logic for these requests
#[derive(Deserialize, Debug)]
pub struct IoxGetRequest {
namespace_name: String,
query: RunQuery,
}
#[derive(Deserialize, Debug, Clone)]
pub enum RunQuery {
/// Unparameterized SQL query
Sql(String),
/// InfluxQL
InfluxQL(String),
// Coming Soon (Prepared Statement support)
}
impl Display for RunQuery {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sql(s) => Display::fmt(s, f),
Self::InfluxQL(s) => Display::fmt(s, f),
}
}
}
impl IoxGetRequest {
/// try to decode a ReadInfo structure from a Token
pub fn try_new(ticket: Ticket) -> Result<Self> {
// decode ticket
IoxGetRequest::decode_protobuf(&ticket.ticket)
.or_else(|e| {
trace!(%e, ticket=%String::from_utf8_lossy(&ticket.ticket),
"Error decoding ticket as ProtoBuf, trying as JSON");
IoxGetRequest::decode_json(&ticket.ticket)
})
.map_err(|e| {
trace!(%e, "Error decoding ticket as JSON");
Error::Invalid
})
}
/// The Go clients still use an older form of ticket encoding, JSON tickets
///
/// - <https://github.com/influxdata/influxdb-iox-client-go/commit/2e7a3b0bd47caab7f1a31a1bbe0ff54aa9486b7b>
/// - <https://github.com/influxdata/influxdb-iox-client-go/commit/52f1a1b8d5bb8cc8dc2fe825f4da630ad0b9167c>
///
/// Go clients are unable to execute InfluxQL queries until the JSON structure is updated
/// accordingly.
fn decode_json(ticket: &[u8]) -> Result<Self, String> {
let json_str = String::from_utf8(ticket.to_vec()).map_err(|_| "Not UTF8".to_string())?;
#[derive(Deserialize, Debug)]
struct ReadInfoJson {
namespace_name: String,
sql_query: String,
}
let ReadInfoJson {
namespace_name,
sql_query,
} = serde_json::from_str(&json_str).map_err(|e| format!("JSON parse error: {}", e))?;
Ok(Self {
namespace_name,
/// Old JSON format is always SQL
query: RunQuery::Sql(sql_query),
})
}
fn decode_protobuf(ticket: &[u8]) -> Result<Self, prost::DecodeError> {
let read_info = proto::ReadInfo::decode(Bytes::from(ticket.to_vec()))?;
let query_type = read_info.query_type();
let proto::ReadInfo {
namespace_name,
sql_query,
query_type: _,
} = read_info;
Ok(Self {
namespace_name,
query: match query_type {
QueryType::Unspecified | QueryType::Sql => RunQuery::Sql(sql_query),
QueryType::InfluxQl => RunQuery::InfluxQL(sql_query),
},
})
}
pub fn namespace_name(&self) -> &str {
self.namespace_name.as_ref()
}
pub fn query(&self) -> &RunQuery {
&self.query
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use super::*;
#[test]
fn json_ticket_decoding() {
// The Go clients still use JSON tickets. See:
//
// - <https://github.com/influxdata/influxdb-iox-client-go/commit/2e7a3b0bd47caab7f1a31a1bbe0ff54aa9486b7b>
// - <https://github.com/influxdata/influxdb-iox-client-go/commit/52f1a1b8d5bb8cc8dc2fe825f4da630ad0b9167c
//
// Do not change this test without having first changed what the Go clients are sending!
let ticket = make_json_ticket(r#"{"namespace_name": "my_db", "sql_query": "SELECT 1;"}"#);
let ri = IoxGetRequest::try_new(ticket).unwrap();
assert_eq!(ri.namespace_name, "my_db");
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1;"));
}
#[test]
fn json_ticket_decoding_error() {
// invalid json (database name rather than namespace name)
let ticket = make_json_ticket(r#"{"database_name": "my_db", "sql_query": "SELECT 1;"}"#);
let e = IoxGetRequest::try_new(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_unspecified() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Unspecified.into(),
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
let ri = IoxGetRequest::try_new(ticket).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_sql() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
});
let ri = IoxGetRequest::try_new(ticket).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_influxql() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
});
let ri = IoxGetRequest::try_new(ticket).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, RunQuery::InfluxQL(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_too_new() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".into(),
query_type: 3, // not a known query type
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
let ri = IoxGetRequest::try_new(ticket).unwrap();
assert_eq!(ri.namespace_name, "<foo>_<bar>");
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_error() {
let ticket = Ticket {
ticket: b"invalid ticket".to_vec(),
};
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
let e = IoxGetRequest::try_new(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket {
let mut buf = Vec::with_capacity(1024);
proto::ReadInfo::encode(read_info, &mut buf).unwrap();
Ticket { ticket: buf }
}
fn make_json_ticket(json: &str) -> Ticket {
Ticket {
ticket: json.as_bytes().to_vec(),
}
}
}