Merge branch 'main' into dom/rpc-endpoint-metrics

pull/24376/head
Luke Bond 2023-01-25 10:44:18 +11:00 committed by GitHub
commit caea42665b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 825 additions and 243 deletions

23
Cargo.lock generated
View File

@ -1790,6 +1790,24 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flightsql"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-flight",
"bytes",
"datafusion",
"futures",
"iox_query",
"observability_deps",
"prost 0.11.6",
"snafu",
"tokio",
"tonic",
"workspace-hack",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
@ -5103,8 +5121,11 @@ dependencies = [
name = "service_common"
version = "0.1.0"
dependencies = [
"arrow-flight",
"async-trait",
"bytes",
"datafusion",
"flightsql",
"iox_query",
"metric",
"parking_lot 0.12.1",
@ -5141,13 +5162,13 @@ dependencies = [
"bytes",
"data_types",
"datafusion",
"flightsql",
"futures",
"generated_types",
"iox_query",
"metric",
"observability_deps",
"prost 0.11.6",
"prost-types 0.11.6",
"serde",
"serde_json",
"service_common",

View File

@ -12,7 +12,9 @@ members = [
"datafusion_util",
"dml",
"executor",
"flightsql",
"generated_types",
"garbage_collector",
"grpc-binary-logger-proto",
"grpc-binary-logger-test-proto",
"grpc-binary-logger",
@ -29,7 +31,6 @@ members = [
"ingester2",
"iox_catalog",
"iox_data_generator",
"garbage_collector",
"iox_query",
"iox_tests",
"iox_time",

22
flightsql/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "flightsql"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow = { workspace = true, features = ["prettyprint"] }
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
datafusion = { workspace = true }
observability_deps = { path = "../observability_deps" }
iox_query = { path = "../iox_query" }
# Crates.io dependencies, in alphabetical order
bytes = "1.3"
futures = "0.3"
snafu = "0.7"
prost = "0.11"
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tonic = "0.8"
workspace-hack = { path = "../workspace-hack"}

4
flightsql/src/lib.rs Normal file
View File

@ -0,0 +1,4 @@
//! InfluxDB IOx implementation of FlightSQL
mod planner;
pub use planner::{Error, FlightSQLPlanner, Result};

169
flightsql/src/planner.rs Normal file
View File

@ -0,0 +1,169 @@
//! FlightSQL handling
use std::{string::FromUtf8Error, sync::Arc};
use arrow::{error::ArrowError, ipc::writer::IpcWriteOptions};
use arrow_flight::{
error::FlightError,
sql::{Any, CommandPreparedStatementQuery, CommandStatementQuery, ProstMessageExt},
IpcMessage, SchemaAsIpc,
};
use bytes::Bytes;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use iox_query::{exec::IOxSessionContext, QueryNamespace};
use observability_deps::tracing::debug;
use prost::Message;
use snafu::{ResultExt, Snafu};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid protobuf for type_url '{}': {}", type_url, source))]
DeserializationTypeKnown {
type_url: String,
source: prost::DecodeError,
},
#[snafu(display("Query was not valid UTF-8: {}", source))]
InvalidUtf8 { source: FromUtf8Error },
#[snafu(display("{}", source))]
Flight { source: FlightError },
#[snafu(display("{}", source))]
DataFusion { source: DataFusionError },
#[snafu(display("{}", source))]
Arrow { source: ArrowError },
#[snafu(display("Unsupported FlightSQL message type: {}", description))]
UnsupportedMessageType { description: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<FlightError> for Error {
fn from(value: FlightError) -> Self {
Self::Flight { source: value }
}
}
impl From<Error> for DataFusionError {
fn from(value: Error) -> Self {
match value {
Error::DataFusion { source } => source,
Error::Arrow { source } => DataFusionError::ArrowError(source),
value => DataFusionError::External(Box::new(value)),
}
}
}
/// Logic for creating plans for various Flight messages against a query database
#[derive(Debug, Default)]
pub struct FlightSQLPlanner {}
impl FlightSQLPlanner {
pub fn new() -> Self {
Self {}
}
/// Returns the schema, in Arrow IPC encoded form, for the request in msg.
pub async fn get_flight_info(
namespace_name: impl Into<String>,
msg: Any,
ctx: &IOxSessionContext,
) -> Result<Bytes> {
let namespace_name = namespace_name.into();
debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql get_flight_info");
match FlightSQLCommand::try_new(&msg)? {
FlightSQLCommand::CommandStatementQuery(query)
| FlightSQLCommand::CommandPreparedStatementQuery(query) => {
Self::get_schema_for_query(&query, ctx).await
}
}
}
/// Return the schema for the specified query
///
/// returns: IPC encoded (schema_bytes) for this query
async fn get_schema_for_query(query: &str, ctx: &IOxSessionContext) -> Result<Bytes> {
// gather real schema, but only
let logical_plan = ctx.plan_sql(query).await.context(DataFusionSnafu)?;
let schema = arrow::datatypes::Schema::from(logical_plan.schema().as_ref());
let options = IpcWriteOptions::default();
// encode the schema into the correct form
let IpcMessage(schema) = SchemaAsIpc::new(&schema, &options)
.try_into()
.context(ArrowSnafu)?;
Ok(schema)
}
/// Returns a plan that computes results requested in msg
pub async fn do_get(
namespace_name: impl Into<String>,
_database: Arc<dyn QueryNamespace>,
msg: Any,
ctx: &IOxSessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let namespace_name = namespace_name.into();
debug!(%namespace_name, type_url=%msg.type_url, "Handling flightsql plan to run an actual query");
match FlightSQLCommand::try_new(&msg)? {
FlightSQLCommand::CommandStatementQuery(query) => {
debug!(%query, "Planning FlightSQL query");
ctx.prepare_sql(&query).await.context(DataFusionSnafu)
}
FlightSQLCommand::CommandPreparedStatementQuery(query) => {
debug!(%query, "Planning FlightSQL prepared query");
ctx.prepare_sql(&query).await.context(DataFusionSnafu)
}
}
}
}
/// Decoded and validated FlightSQL command
#[derive(Debug, Clone)]
enum FlightSQLCommand {
CommandStatementQuery(String),
CommandPreparedStatementQuery(String),
}
impl FlightSQLCommand {
/// Figure out and decode the specific FlightSQL command in `msg`
fn try_new(msg: &Any) -> Result<Self> {
if let Some(decoded_cmd) = try_unpack::<CommandStatementQuery>(msg)? {
let CommandStatementQuery { query } = decoded_cmd;
Ok(Self::CommandStatementQuery(query))
} else if let Some(decoded_cmd) = try_unpack::<CommandPreparedStatementQuery>(msg)? {
let CommandPreparedStatementQuery {
prepared_statement_handle,
} = decoded_cmd;
// handle should be a decoded query
let query =
String::from_utf8(prepared_statement_handle.to_vec()).context(InvalidUtf8Snafu)?;
Ok(Self::CommandPreparedStatementQuery(query))
} else {
UnsupportedMessageTypeSnafu {
description: &msg.type_url,
}
.fail()
}
}
}
/// try to unpack the [`arrow_flight::sql::Any`] as type `T`, returning Ok(None) if
/// the type is wrong or Err if an error occurs
fn try_unpack<T: ProstMessageExt>(msg: &Any) -> Result<Option<T>> {
// Does the type URL match?
if T::type_url() != msg.type_url {
return Ok(None);
}
// type matched, so try and decode
let m = Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu {
type_url: &msg.type_url,
})?;
Ok(Some(m))
}

View File

@ -3,38 +3,49 @@ package influxdata.iox.querier.v1;
option go_package = "github.com/influxdata/iox/querier/v1";
/*
* Message definitions for the InfluxDB IOx Flight API
* Message definition for the native InfluxDB IOx Flight API
*
* These messages are what is sent to/from an InfluxDB IOx server's
* `DoGet` endpoint as the opaque "Ticket" in Arrow Flight messages.
* ReadInfo is sent to an InfluxDB IOx Querier server's `DoGet` RPC
* method as the opaque "Ticket" in Arrow Flight messages.
*
* The bytes for the Tickets are created by encoding these messages
* using the protobuf binary format.
* Tickets are created by encoding these messages using the protobuf
* binary format.
*
* Clients can construct these Ticket's directly to avoid making two
* requests to run each query
* IOx clients can construct these Tickets directly to avoid making
* two RPC requests as typically required by Arrow Flight (a
* `GetFlightInfo` followed by a `DoGet`).
*/
// Request for an IOx querier to execute a query on a user's behalf.
message ReadInfo {
// Namespace name.
string namespace_name = 1;
// query text (either SQL or InfluxQL, depending on query_type)
// Query text (either SQL or InfluxQL, depending on query_type)
string sql_query = 2;
// A FlightSQL command payload (serialized protobuf bytes). One of
// the messages defined in the [protobuf definition].
//
// [protobuf definition]: https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions
bytes flightsql_command = 4;
// The type of query
QueryType query_type = 3;
enum QueryType {
// An unspecified query type. IOx may choose how to interpret sql_query.
QUERY_TYPE_UNSPECIFIED = 0;
// SQL query.
// SQL query. `sql_query` contains a SQL query as text
QUERY_TYPE_SQL = 1;
// InfluxQL query.
// InfluxQL query. `sql_query` contains an InfluxQL query as text
QUERY_TYPE_INFLUX_QL = 2;
// FlightSQL message: `sql_query` is empty, flightsql_command
// contains a serialized FlightSQL message.
QUERY_TYPE_FLIGHT_SQL_MESSAGE = 3;
}
}
// Message included in the DoGet response from the querier

View File

@ -180,6 +180,7 @@ impl Client {
namespace_name,
sql_query,
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
};
self.do_get_with_read_info(request).await
@ -196,6 +197,7 @@ impl Client {
namespace_name,
sql_query: influxql_query,
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
};
self.do_get_with_read_info(request).await

View File

@ -312,16 +312,20 @@ impl IOxSessionContext {
&self.inner
}
/// Plan a SQL statement. This assumes that any tables referenced
/// in the SQL have been registered with this context. Use
/// `prepare_sql` to actually execute the query.
pub async fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
let ctx = self.child_ctx("plan_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
ctx.inner.state().create_logical_plan(sql).await
}
/// Prepare a SQL statement for execution. This assumes that any
/// tables referenced in the SQL have been registered with this context
pub async fn prepare_sql(&self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
let ctx = self.child_ctx("prepare_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
#[allow(deprecated)]
let logical_plan = ctx.inner.state().create_logical_plan(sql).await?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
let logical_plan = self.plan_sql(sql).await?;
// Make nicer erorrs for unsupported SQL
// (By default datafusion returns Internal Error)
@ -347,6 +351,7 @@ impl IOxSessionContext {
_ => (),
}
let ctx = self.child_ctx("prepare_sql");
ctx.create_physical_plan(&logical_plan).await
}

View File

@ -27,6 +27,7 @@ pub struct GapFill {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
time_column: Expr,
stride: Expr,
}
impl GapFill {
@ -35,12 +36,14 @@ impl GapFill {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
time_column: Expr,
stride: Expr,
) -> Result<Self> {
Ok(Self {
input,
group_expr,
aggr_expr,
time_column,
stride,
})
}
}
@ -69,8 +72,8 @@ impl UserDefinedLogicalNode for GapFill {
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GapFill: groupBy=[{:?}], aggr=[{:?}], time_column={}",
self.group_expr, self.aggr_expr, self.time_column
"GapFill: groupBy=[{:?}], aggr=[{:?}], time_column={}, stride={}",
self.group_expr, self.aggr_expr, self.time_column, self.stride
)
}
@ -86,6 +89,7 @@ impl UserDefinedLogicalNode for GapFill {
group_expr,
aggr_expr,
self.time_column.clone(),
self.stride.clone(),
)
.expect("should not fail");
Arc::new(gapfill)
@ -131,11 +135,19 @@ pub(crate) fn plan_gap_fill(
let logical_time_column = gap_fill.time_column.try_into_col()?;
let time_column = Column::new_with_schema(&logical_time_column.name, input_schema)?;
let stride = create_physical_expr(
&gap_fill.stride,
input_dfschema,
input_schema,
execution_props,
)?;
GapFillExec::try_new(
Arc::clone(&physical_inputs[0]),
group_expr,
aggr_expr,
time_column,
stride,
)
}
@ -151,6 +163,8 @@ pub struct GapFillExec {
// The sort expressions for the required sort order of the input:
// all of the group exressions, with the time column being last.
sort_expr: Vec<PhysicalSortExpr>,
// The stride of the date bins
stride: Arc<dyn PhysicalExpr>,
}
impl GapFillExec {
@ -159,6 +173,7 @@ impl GapFillExec {
group_expr: Vec<Arc<dyn PhysicalExpr>>,
aggr_expr: Vec<Arc<dyn PhysicalExpr>>,
time_column: Column,
stride: Arc<dyn PhysicalExpr>,
) -> Result<Self> {
let sort_expr = {
let mut sort_expr: Vec<_> = group_expr
@ -201,6 +216,7 @@ impl GapFillExec {
aggr_expr,
time_column,
sort_expr,
stride,
})
}
}
@ -256,6 +272,7 @@ impl ExecutionPlan for GapFillExec {
self.group_expr.clone(),
self.aggr_expr.clone(),
self.time_column.clone(),
Arc::clone(&self.stride),
)?)),
_ => Err(DataFusionError::Internal(
"GapFillExec wrong number of children".to_string(),
@ -283,9 +300,10 @@ impl ExecutionPlan for GapFillExec {
let aggr_expr: Vec<_> = self.aggr_expr.iter().map(|e| e.to_string()).collect();
write!(
f,
"GapFillExec: group_expr=[{}], aggr_expr=[{}]",
"GapFillExec: group_expr=[{}], aggr_expr=[{}], stride={}",
group_expr.join(", "),
aggr_expr.join(", ")
aggr_expr.join(", "),
self.stride,
)
}
}
@ -307,7 +325,8 @@ mod test {
error::Result,
logical_expr::{logical_plan, Extension},
physical_plan::displayable,
prelude::col,
prelude::{col, lit},
scalar::ScalarValue,
sql::TableReference,
};
@ -339,11 +358,12 @@ mod test {
vec![col("loc"), col("time")],
vec![col("temp")],
col("time"),
lit(ScalarValue::IntervalDayTime(Some(60_000))),
)?;
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(gapfill),
});
let expected = "GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time\
let expected = "GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\")\
\n TableScan: temps";
assert_eq!(expected, format!("{}", plan.display_indent()));
Ok(())
@ -382,7 +402,7 @@ mod test {
\nGROUP BY minute;",
format!(
"ProjectionExec: expr=[date_bin_gapfill({dbg_args})@0 as minute, AVG(temps.temp)@1 as AVG(temps.temp)]\
\n GapFillExec: group_expr=[date_bin_gapfill({dbg_args})@0], aggr_expr=[AVG(temps.temp)@1]\
\n GapFillExec: group_expr=[date_bin_gapfill({dbg_args})@0], aggr_expr=[AVG(temps.temp)@1], stride=60000\
\n SortExec: [date_bin_gapfill({dbg_args})@0 ASC]\
\n AggregateExec: mode=Final, gby=[date_bin_gapfill({dbg_args})@0 as date_bin_gapfill({dbg_args})], aggr=[AVG(temps.temp)]"
).as_str()
@ -406,7 +426,7 @@ mod test {
\nGROUP BY loc, minute, loczz;",
format!(
"ProjectionExec: expr=[loc@0 as loc, date_bin_gapfill({dbg_args})@1 as minute, concat(Utf8(\"zz\"),temps.loc)@2 as loczz, AVG(temps.temp)@3 as AVG(temps.temp)]\
\n GapFillExec: group_expr=[loc@0, date_bin_gapfill({dbg_args})@1, concat(Utf8(\"zz\"),temps.loc)@2], aggr_expr=[AVG(temps.temp)@3]\
\n GapFillExec: group_expr=[loc@0, date_bin_gapfill({dbg_args})@1, concat(Utf8(\"zz\"),temps.loc)@2], aggr_expr=[AVG(temps.temp)@3], stride=60000\
\n SortExec: [loc@0 ASC,concat(Utf8(\"zz\"),temps.loc)@2 ASC,date_bin_gapfill({dbg_args})@1 ASC]\
\n AggregateExec: mode=Final, gby=[loc@0 as loc, date_bin_gapfill({dbg_args})@1 as date_bin_gapfill({dbg_args}), concat(Utf8(\"zz\"),temps.loc)@2 as concat(Utf8(\"zz\"),temps.loc)], aggr=[AVG(temps.temp)]"
).as_str()

View File

@ -97,12 +97,23 @@ fn handle_aggregate(aggr: &Aggregate) -> Result<Option<LogicalPlan>> {
} = aggr;
// new_group_expr has DATE_BIN_GAPFILL replaced with DATE_BIN.
let (new_group_expr, dbg_idx) = if let Some(v) = replace_date_bin_gapfill(group_expr)? {
let RewriteInfo {
new_group_expr,
date_bin_gapfill_index,
date_bin_gapfill_args,
} = if let Some(v) = replace_date_bin_gapfill(group_expr)? {
v
} else {
return Ok(None);
};
if date_bin_gapfill_args.len() != 3 {
return Err(DataFusionError::Plan(format!(
"DATE_BIN_GAPFILL expects 3 arguments, got {}",
date_bin_gapfill_args.len()
)));
}
let new_aggr_plan = {
// Create the aggregate node with the same output schema as the orignal
// one. This means that there will be an output column called `date_bin_gapfill(...)`
@ -127,32 +138,52 @@ fn handle_aggregate(aggr: &Aggregate) -> Result<Option<LogicalPlan>> {
.map(|f| Expr::Column(f.qualified_column()))
.collect();
let aggr_expr = new_group_expr.split_off(group_expr.len());
let time_column = col(new_aggr_plan.schema().fields()[dbg_idx].qualified_column());
let time_column =
col(new_aggr_plan.schema().fields()[date_bin_gapfill_index].qualified_column());
let stride = date_bin_gapfill_args
.into_iter()
.next()
.expect("there are three args");
LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(new_aggr_plan),
new_group_expr,
aggr_expr,
time_column,
stride,
)?),
})
};
Ok(Some(new_gap_fill_plan))
}
struct RewriteInfo {
// Group expressions with DATE_BIN_GAPFILL rewritten to DATE_BIN.
new_group_expr: Vec<Expr>,
// The index of the group expression that contained the call to DATE_BIN_GAPFILL.
date_bin_gapfill_index: usize,
// The arguments to the call to DATE_BIN_GAPFILL.
date_bin_gapfill_args: Vec<Expr>,
}
// Iterate over the group expression list.
// If it finds no occurrences of date_bin_gapfill at the top of
// each expression tree, it will return None.
// If it finds such an occurrence, it will return a new expression list
// with the date_bin_gapfill replaced with date_bin, and the index of
// where the replacement occurred.
fn replace_date_bin_gapfill(group_expr: &[Expr]) -> Result<Option<(Vec<Expr>, usize)>> {
// If it finds no occurrences of date_bin_gapfill, it will return None.
// If it finds more than one occurrence it will return an error.
// Otherwise it will return a RewriteInfo for the optimizer rule to use.
fn replace_date_bin_gapfill(group_expr: &[Expr]) -> Result<Option<RewriteInfo>> {
let mut date_bin_gapfill_count = 0;
group_expr.iter().try_for_each(|e| -> Result<()> {
let fn_cnt = count_date_bin_gapfill(e)?;
date_bin_gapfill_count += fn_cnt;
Ok(())
})?;
let mut dbg_idx = None;
group_expr
.iter()
.enumerate()
.try_for_each(|(i, e)| -> Result<()> {
let fn_cnt = count_date_bin_gapfill(e)?;
date_bin_gapfill_count += fn_cnt;
if fn_cnt > 0 {
dbg_idx = Some(i);
}
Ok(())
})?;
match date_bin_gapfill_count {
0 => return Ok(None),
2.. => {
@ -162,30 +193,31 @@ fn replace_date_bin_gapfill(group_expr: &[Expr]) -> Result<Option<(Vec<Expr>, us
}
_ => (),
}
let date_bin_gapfill_index = dbg_idx.expect("should be found exactly one call");
let group_expr = group_expr.to_owned();
let mut new_group_expr = Vec::with_capacity(group_expr.len());
let mut dbg_idx = None;
group_expr
.into_iter()
let mut rewriter = DateBinGapfillRewriter { args: None };
let group_expr = group_expr
.iter()
.enumerate()
.try_for_each(|(i, e)| -> Result<()> {
let mut rewriter = DateBinGapfillRewriter { found: false };
new_group_expr.push(e.rewrite(&mut rewriter)?);
if rewriter.found {
dbg_idx = Some(i);
.map(|(i, e)| {
if i == date_bin_gapfill_index {
e.clone().rewrite(&mut rewriter)
} else {
Ok(e.clone())
}
Ok(())
})?;
Ok(Some((
new_group_expr,
dbg_idx.expect("should have found a call to DATE_BIN_GAPFILL based on previous check"),
)))
})
.collect::<Result<Vec<_>>>()?;
let date_bin_gapfill_args = rewriter.args.expect("should have found args");
Ok(Some(RewriteInfo {
new_group_expr: group_expr,
date_bin_gapfill_index,
date_bin_gapfill_args,
}))
}
struct DateBinGapfillRewriter {
found: bool,
args: Option<Vec<Expr>>,
}
impl ExprRewriter for DateBinGapfillRewriter {
@ -201,7 +233,7 @@ impl ExprRewriter for DateBinGapfillRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::ScalarUDF { fun, args } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.found = true;
self.args = Some(args.clone());
Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::DateBin,
args,
@ -373,7 +405,7 @@ mod test {
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None)";
let expected = format!(
"GapFill: groupBy=[[date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args})\
"GapFill: groupBy=[[date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args}), stride=IntervalDayTime(\"60000\")\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None))]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;
@ -395,7 +427,7 @@ mod test {
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None)";
let expected = format!(
"GapFill: groupBy=[[date_bin_gapfill({dbg_args}), temps.loc]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args})\
"GapFill: groupBy=[[date_bin_gapfill({dbg_args}), temps.loc]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args}), stride=IntervalDayTime(\"60000\")\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None)), temps.loc]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;
@ -439,7 +471,7 @@ mod test {
let expected = format!(
"Projection: date_bin_gapfill({dbg_args}), AVG(temps.temp)\
\n GapFill: groupBy=[[date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args})\
\n GapFill: groupBy=[[date_bin_gapfill({dbg_args})]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill({dbg_args}), stride=IntervalDayTime(\"60000\")\
\n Aggregate: groupBy=[[datebin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(0, None))]], aggr=[[AVG(temps.temp)]]\
\n TableScan: temps");
assert_optimized_plan_eq(&plan, &expected)?;

View File

@ -1,6 +1,12 @@
//! A lazy connector for Tonic gRPC [`Channel`] instances.
use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
@ -14,6 +20,12 @@ use tonic::transport::{Channel, Endpoint};
use super::{client::WriteClient, RpcWriteError};
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// How many consecutive errors must be observed before opening a new connection
/// (at most once per [`RETRY_INTERVAL]).
const RECONNECT_ERROR_COUNT: usize = 10;
/// Lazy [`Channel`] connector.
///
@ -27,17 +39,33 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(1);
pub struct LazyConnector {
addr: Endpoint,
connection: Arc<Mutex<Option<Channel>>>,
/// The number of request errors observed without a single success.
consecutive_errors: Arc<AtomicUsize>,
/// A task that periodically opens a new connection to `addr` when
/// `consecutive_errors` is more than [`RECONNECT_ERROR_COUNT`].
connection_task: JoinHandle<()>,
}
impl LazyConnector {
/// Lazily connect to `addr`.
pub fn new(addr: Endpoint) -> Self {
let addr = addr
.connect_timeout(CONNECT_TIMEOUT)
.timeout(REQUEST_TIMEOUT);
let connection = Default::default();
// Drive first connection by setting it above the connection limit.
let consecutive_errors = Arc::new(AtomicUsize::new(RECONNECT_ERROR_COUNT + 1));
Self {
addr: addr.clone(),
connection: Arc::clone(&connection),
connection_task: tokio::spawn(try_connect(addr, connection)),
connection_task: tokio::spawn(try_connect(
addr,
connection,
Arc::clone(&consecutive_errors),
)),
consecutive_errors,
}
}
@ -58,8 +86,16 @@ impl WriteClient for LazyConnector {
let conn =
conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?;
WriteServiceClient::new(conn).write(op).await?;
Ok(())
match WriteServiceClient::new(conn).write(op).await {
Err(e) => {
self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
return Err(e);
}
Ok(_) => {
self.consecutive_errors.store(0, Ordering::Relaxed);
Ok(())
}
}
}
}
@ -69,19 +105,25 @@ impl Drop for LazyConnector {
}
}
async fn try_connect(addr: Endpoint, connection: Arc<Mutex<Option<Channel>>>) {
async fn try_connect(
addr: Endpoint,
connection: Arc<Mutex<Option<Channel>>>,
consecutive_errors: Arc<AtomicUsize>,
) {
loop {
match addr.connect().await {
Ok(v) => {
info!(endpoint = %addr.uri(), "connected to upstream ingester");
*connection.lock() = Some(v);
return;
if consecutive_errors.load(Ordering::Relaxed) > RECONNECT_ERROR_COUNT {
match addr.connect().await {
Ok(v) => {
info!(endpoint = %addr.uri(), "connected to upstream ingester");
*connection.lock() = Some(v);
consecutive_errors.store(0, Ordering::Relaxed);
}
Err(e) => warn!(
endpoint = %addr.uri(),
error=%e,
"failed to connect to upstream ingester"
),
}
Err(e) => warn!(
endpoint = %addr.uri(),
error=%e,
"failed to connect to upstream ingester"
),
}
tokio::time::sleep(RETRY_INTERVAL).await;
}

View File

@ -7,8 +7,11 @@ license.workspace = true
[dependencies] # In alphabetical order
async-trait = "0.1.63"
arrow-flight = { workspace = true, features = ["flight-sql-experimental"] }
bytes = "1.3"
datafusion = { workspace = true }
iox_query = { path = "../iox_query" }
flightsql = { path = "../flightsql" }
metric = { path = "../metric" }
parking_lot = "0.12"
predicate = { path = "../predicate" }

View File

@ -1,7 +1,10 @@
//! Query planner wrapper for use in IOx services
use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
use arrow_flight::sql::Any;
use bytes::Bytes;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use flightsql::FlightSQLPlanner;
use iox_query::{
exec::IOxSessionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
@ -17,7 +20,7 @@ use predicate::rpc_predicate::InfluxRpcPredicate;
///
/// Query planning was, at time of writing, a single threaded
/// affair. In order to avoid tying up the tokio executor that is
/// handling API requests, we plan queries using a separate thread
/// handling API requests, IOx plan queries using a separate thread
/// pool.
pub struct Planner {
/// Executors (whose threadpool to use)
@ -60,6 +63,50 @@ impl Planner {
.await
}
/// Creates a plan for a `DoGet` FlightSQL message,
/// as described on [`FlightSQLPlanner::do_get`], on a
/// separate threadpool
pub async fn flight_sql_do_get<N>(
&self,
namespace_name: impl Into<String>,
namespace: Arc<N>,
msg: Any,
) -> Result<Arc<dyn ExecutionPlan>>
where
N: QueryNamespace + 'static,
{
let namespace_name = namespace_name.into();
let ctx = self.ctx.child_ctx("planner flight_sql_do_get");
self.ctx
.run(async move {
FlightSQLPlanner::do_get(namespace_name, namespace, msg, &ctx)
.await
.map_err(DataFusionError::from)
})
.await
}
/// Creates the response for a `GetFlightInfo` FlightSQL message
/// as described on [`FlightSQLPlanner::get_flight_info`], on a
/// separate threadpool.
pub async fn flight_sql_get_flight_info(
&self,
namespace_name: impl Into<String>,
msg: Any,
) -> Result<Bytes> {
let namespace_name = namespace_name.into();
let ctx = self.ctx.child_ctx("planner flight_sql_get_flight_info");
self.ctx
.run(async move {
FlightSQLPlanner::get_flight_info(namespace_name, msg, &ctx)
.await
.map_err(DataFusionError::from)
})
.await
}
/// Creates a plan as described on
/// [`InfluxRpcPlanner::table_names`], on a separate threadpool
pub async fn table_names<N>(

View File

@ -10,9 +10,9 @@ license.workspace = true
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
datafusion = { workspace = true }
flightsql = { path = "../flightsql" }
generated_types = { path = "../generated_types" }
observability_deps = { path = "../observability_deps" }
prost-types = { version = "0.11", features = ["std"] }
iox_query = { path = "../iox_query" }
service_common = { path = "../service_common" }
trace = { path = "../trace"}

View File

@ -1,4 +1,5 @@
//! Implements the InfluxDB IOx Flight API using Arrow Flight and gRPC
//! Implements the InfluxDB IOx Flight API and Arrow FlightSQL, based
//! on Arrow Flight and gRPC. See [`FlightService`] for full detail.
mod request;
@ -11,7 +12,7 @@ use arrow_flight::{
error::FlightError,
flight_descriptor::DescriptorType,
flight_service_server::{FlightService as Flight, FlightServiceServer as FlightServer},
sql::{CommandStatementQuery, ProstMessageExt},
sql::Any,
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket,
};
@ -28,7 +29,7 @@ use observability_deps::tracing::{debug, info, warn};
use prost::Message;
use request::{IoxGetRequest, RunQuery};
use service_common::{datafusion_error_to_tonic_code, planner::Planner, QueryNamespaceProvider};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll, time::Instant};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
@ -51,11 +52,8 @@ pub enum Error {
#[snafu(display("Internal creating encoding ticket: {}", source))]
InternalCreatingTicket { source: request::Error },
#[snafu(display("Invalid query, could not parse '{}': {}", query, source))]
InvalidQuery {
query: String,
source: serde_json::Error,
},
#[snafu(display("Invalid handshake. No payload provided"))]
InvalidHandshake {},
#[snafu(display("Namespace {} not found", namespace_name))]
NamespaceNotFound { namespace_name: String },
@ -89,18 +87,12 @@ pub enum Error {
source: service_common::planner::Error,
},
#[snafu(display("Error during protobuf serialization: {}", source))]
Serialization { source: prost::EncodeError },
#[snafu(display("Error while planning FlightSQL : {}", source))]
FlightSQLPlanning { source: flightsql::Error },
#[snafu(display("Invalid protobuf: {}", source))]
Deserialization { source: prost::DecodeError },
#[snafu(display("Invalid protobuf for type_url'{}': {}", type_url, source))]
DeserializationTypeKnown {
type_url: String,
source: prost::DecodeError,
},
#[snafu(display("Unsupported message type: {}", description))]
UnsupportedMessageType { description: String },
}
@ -116,7 +108,7 @@ impl From<Error> for tonic::Status {
match err {
Error::NamespaceNotFound { .. }
| Error::InvalidTicket { .. }
| Error::InvalidQuery { .. }
| Error::InvalidHandshake { .. }
// TODO(edd): this should be `debug`. Keeping at info while IOx in early development
| Error::InvalidNamespaceName { .. } => info!(e=%err, msg),
Error::Query { .. } => info!(e=%err, msg),
@ -124,11 +116,10 @@ impl From<Error> for tonic::Status {
|Error::NoNamespaceHeader
|Error::InvalidNamespaceHeader { .. }
| Error::Planning { .. }
| Error::Serialization { .. }
| Error::Deserialization { .. }
| Error::DeserializationTypeKnown { .. }
| Error::InternalCreatingTicket { .. }
| Error::UnsupportedMessageType { .. }
| Error::FlightSQLPlanning { .. }
=> {
warn!(e=%err, msg)
}
@ -146,10 +137,8 @@ impl Error {
let code = match self {
Self::NamespaceNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. }
| Self::InvalidQuery { .. }
| Self::Serialization { .. }
| Self::InvalidHandshake { .. }
| Self::Deserialization { .. }
| Self::DeserializationTypeKnown { .. }
| Self::NoNamespaceHeader
| Self::InvalidNamespaceHeader { .. }
| Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument,
@ -157,6 +146,18 @@ impl Error {
datafusion_error_to_tonic_code(&source)
}
Self::UnsupportedMessageType { .. } => tonic::Code::Unimplemented,
Error::FlightSQLPlanning { source } => match source {
flightsql::Error::DeserializationTypeKnown { .. }
| flightsql::Error::InvalidUtf8 { .. }
| flightsql::Error::UnsupportedMessageType { .. } => tonic::Code::InvalidArgument,
flightsql::Error::Flight { source: e } => return tonic::Status::from(e),
fs_err @ flightsql::Error::Arrow { .. } => {
// wrap in Datafusion error to walk source stacks
let df_error = DataFusionError::from(fs_err);
datafusion_error_to_tonic_code(&df_error)
}
flightsql::Error::DataFusion { source } => datafusion_error_to_tonic_code(&source),
},
Self::InternalCreatingTicket { .. } | Self::Optimize { .. } => tonic::Code::Internal,
};
@ -172,7 +173,105 @@ impl Error {
type TonicStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + 'static>>;
/// Concrete implementation of the gRPC Arrow Flight Service API
/// Concrete implementation of the IOx client protocol, implemented as
/// a gRPC [Arrow Flight] Service API
///
/// # Tickets
///
/// Creating and serializing the `Ticket` structure used in IOx Arrow
/// Flight API is handled by [`IoxGetRequest`]. See that for more
/// details.
///
/// # Native IOx API ad-hoc query
///
/// To run a query with the native IOx API, a client needs to
///
/// 1. Encode the query string as a `Ticket` (see [`IoxGetRequest`]).
///
/// 2. Call the `DoGet` method with the `Ticket`,
///
/// 2. Recieve a stream of data encoded as [`FlightData`]
///
/// ```text
/// .───────.
/// ╔═══════════╗ ( )
/// ║ ║ │`───────'│
/// ║ Client ║ │ IOx │
/// ║ ║ │.───────.│
/// ║ ║ ( )
/// ╚═══════════╝ `───────'
/// ┃ Creates a ┃
/// 1 ┃ Ticket ┃
/// ┃ ┃
/// ┃ ┃
/// 2 ┃ DoGet(Ticket) ┃
/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ ┃
/// ┃ Stream of FightData ┃
/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ```
///
/// # FlightSQL
///
/// IOx also supports [Arrow FlightSQL]. In addition to `DoGet`,
/// FlightSQL clients call additional Arrow Flight RPC methods such as
/// `GetFlightInfo`, `GetSchema`, `DoPut`, and `DoAction`.
///
/// ## FlightSQL List tables (NOT YET IMPLEMENTED)
///
/// TODO sequence diagram for List Tables
///
/// ## FlightSQL ad-hoc query
///
/// To run an ad-hoc query, via FlightSQL, the client needs to
///
/// 1. Encode the query in a `CommandStatementQuery` FlightSQL
/// structure in a [`FlightDescriptor`]
///
/// 2. Call the `GetFlightInfo` method with the the [`FlightDescriptor`]
///
/// 3. Receive a `Ticket` in the returned [`FlightInfo`]. The Ticket is
/// opaque (uninterpreted) by the client. It contains an
/// [`IoxGetRequest`] with the `CommandStatementQuery` request.
///
/// 4. Calls the `DoGet` method with the `Ticket` from the previous step.
///
/// 5. Recieve a stream of data encoded as [`FlightData`]
///
/// ```text
/// .───────.
/// ╔═══════════╗ ( )
/// ║ ║ │`───────'│
/// ║ FlightSQL ║ │ IOx │
/// ║ Client ║ │.───────.│
/// ║ ║ ( )
/// ╚═══════════╝ `───────'
/// ┃ Creates a ┃
/// 1 ┃ CommandStatementQuery ┃
/// ┃ ┃
/// ┃ ┃
/// ┃ ┃
/// 2 ┃ GetFlightInfo(CommandStatementQuery) ┃
/// ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ FlightInfo{..Ticket{ ┃
/// ┃ CommandStatementQuery ┃
/// 3 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ┃ ┃
/// ┃ ┃
/// ┃ DoGet(Ticket) ┃
/// 4 ┃━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━▶┃
/// ┃ ┃
/// ┃ Stream of FightData ┃
/// 5 ┃◀ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┃
/// ┃ ┃
/// ```
///
/// ## FlightSQL Prepared Statement (NOT YET IMPLEMENTED)
///
/// TODO sequence diagram
///
/// [Arrow Flight]: https://arrow.apache.org/docs/format/Flight.html
/// [Arrow FlightSQL]: https://arrow.apache.org/docs/format/FlightSql.html
#[derive(Debug)]
struct FlightService<S>
where
@ -192,7 +291,8 @@ impl<S> FlightService<S>
where
S: QueryNamespaceProvider,
{
async fn run_query(
/// Implementation of the `DoGet` method
async fn run_do_get(
&self,
span_ctx: Option<SpanContext>,
permit: InstrumentedAsyncOwnedSemaphorePermit,
@ -223,6 +323,14 @@ where
.context(PlanningSnafu)?;
(token, plan)
}
RunQuery::FlightSql(msg) => {
let token = db.record_query(&ctx, "flightsql", Box::new(msg.type_url.clone()));
let plan = Planner::new(&ctx)
.flight_sql_do_get(&namespace, db, msg.clone())
.await
.context(PlanningSnafu)?;
(token, plan)
}
};
let output =
@ -249,7 +357,9 @@ where
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
Err(tonic::Status::unimplemented(
"Not yet implemented: get_schema",
))
}
async fn do_get(
@ -279,17 +389,17 @@ where
// Log after we acquire the permit and are about to start execution
let start = Instant::now();
info!(%namespace_name, %query, %trace, "Running SQL via flight do_get");
info!(%namespace_name, %query, %trace, "DoGet request");
let response = self
.run_query(span_ctx, permit, query, namespace_name.to_string())
.run_do_get(span_ctx, permit, query, namespace_name.to_string())
.await;
if let Err(e) = &response {
info!(%namespace_name, %query, %trace, %e, "Error running SQL query");
info!(%namespace_name, %query, %trace, %e, "Error running DoGet");
} else {
let elapsed = Instant::now() - start;
debug!(%namespace_name, %query,%trace, ?elapsed, "Completed SQL query successfully");
debug!(%namespace_name, %query, %trace, ?elapsed, "Completed DoGet request");
}
response
}
@ -298,7 +408,12 @@ where
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, tonic::Status> {
let request = request.into_inner().message().await?.unwrap();
let request = request
.into_inner()
.message()
.await?
.context(InvalidHandshakeSnafu)?;
let response = HandshakeResponse {
protocol_version: request.protocol_version,
payload: request.payload,
@ -311,20 +426,24 @@ where
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
Err(tonic::Status::unimplemented(
"Not yet implemented: list_flights",
))
}
/// Handles requests encoded in the FlightDescriptor
///
/// IOx currently only processes "cmd" type Descriptors (not
/// paths) and attempts to decodes the [`FlightDescriptor::cmd`]
/// bytes as an encoded protobuf message
///
/// Handles `GetFlightInfo` RPC requests. The [`FlightDescriptor`]
/// is treated containing an FlightSQL command, encoded as a binary
/// ProtoBuf message.
///
/// see [`FlightService`] for more details.
async fn get_flight_info(
&self,
request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, tonic::Status> {
let external_span_ctx: Option<RequestLogContext> = request.extensions().get().cloned();
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let trace = external_span_ctx.format_jaeger();
// look for namespace information in headers
let namespace_name = request
.metadata()
@ -336,111 +455,43 @@ where
})
.ok_or(Error::NoNamespaceHeader)??;
let request = request.into_inner();
let flight_descriptor = request.into_inner();
let cmd = match request.r#type() {
DescriptorType::Cmd => Ok(&request.cmd),
DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")),
DescriptorType::Unknown => Err(Error::unsupported_message_type(
"FlightInfo of unknown type",
)),
}?;
// extract the FlightSQL message
let msg = msg_from_descriptor(flight_descriptor.clone())?;
let message: prost_types::Any =
prost::Message::decode(cmd.as_ref()).context(DeserializationSnafu)?;
let type_url = &msg.type_url;
info!(%namespace_name, %type_url, %trace, "GetFlightInfo request");
let flight_info = self.dispatch(&namespace_name, request, message).await?;
Ok(tonic::Response::new(flight_info))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
impl<S> FlightService<S>
where
S: QueryNamespaceProvider,
{
/// Given a successfully decoded protobuf *Any* message, handles
/// recognized messages (e.g those defined by FlightSQL) and
/// creates the appropriate FlightData response
///
/// Arguments
///
/// namespace_name: is the target namespace of the request
///
/// flight_descriptor: is the descriptor sent in the request (included in response)
///
/// msg is the `cmd` field of the flight descriptor decoded as a protobuf message
async fn dispatch(
&self,
namespace_name: &str,
flight_descriptor: FlightDescriptor,
msg: prost_types::Any,
) -> Result<FlightInfo> {
fn try_unpack<T: ProstMessageExt>(msg: &prost_types::Any) -> Result<Option<T>> {
// Does the type URL match?
if T::type_url() != msg.type_url {
return Ok(None);
}
// type matched, so try and decode
let m = prost::Message::decode(&*msg.value).context(DeserializationTypeKnownSnafu {
type_url: &msg.type_url,
let db = self
.server
.db(&namespace_name, span_ctx.child_span("get namespace"))
.await
.ok_or_else(|| {
tonic::Status::not_found(format!("Unknown namespace: {namespace_name}"))
})?;
Ok(Some(m))
}
// FlightSQL CommandStatementQuery
let (schema, ticket) = if let Some(cmd) = try_unpack::<CommandStatementQuery>(&msg)? {
let CommandStatementQuery { query } = cmd;
debug!(%namespace_name, %query, "Handling FlightSQL CommandStatementQuery");
let ctx = db.new_query_context(span_ctx);
let schema = Planner::new(&ctx)
.flight_sql_get_flight_info(&namespace_name, msg.clone())
.await
.context(PlanningSnafu);
// TODO is supposed to return a schema -- if clients
// actually expect the schema we'll have to plan the query
// here.
let schema = vec![];
// Create a ticket that can be passed to do_get to run the query
let ticket = IoxGetRequest::new(namespace_name, RunQuery::Sql(query))
.try_encode()
.context(InternalCreatingTicketSnafu)?;
(schema, ticket)
if let Err(e) = &schema {
info!(%namespace_name, %type_url, %trace, %e, "Error running GetFlightInfo");
} else {
return Err(Error::unsupported_message_type(format!(
"Unsupported cmd message: {}",
msg.type_url
)));
debug!(%namespace_name, %type_url, %trace, "Completed GetFlightInfo request");
};
let schema = schema?;
// form the response
// Form the response ticket (that the client will pass back to DoGet)
let ticket = IoxGetRequest::new(&namespace_name, RunQuery::FlightSql(msg))
.try_encode()
.context(InternalCreatingTicketSnafu)?;
// Arrow says "set to -1 if not known
// Flight says "Set these to -1 if unknown."
//
// https://github.com/apache/arrow-rs/blob/a0a5880665b1836890f6843b6b8772d81c463351/format/Flight.proto#L274-L276
let total_records = -1;
let total_bytes = -1;
@ -454,13 +505,65 @@ where
location: vec![],
}];
Ok(FlightInfo {
schema: schema.into(),
let flight_info = FlightInfo {
schema,
// return descriptor we were passed
flight_descriptor: Some(flight_descriptor),
endpoint,
total_records,
total_bytes,
})
};
Ok(tonic::Response::new(flight_info))
}
async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, tonic::Status> {
Err(tonic::Status::unimplemented("Not yet implemented: do_put"))
}
async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: do_action",
))
}
async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: list_actions",
))
}
async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, tonic::Status> {
Err(tonic::Status::unimplemented(
"Not yet implemented: do_exchange",
))
}
}
/// Extracts an encoded Protobuf message from a [`FlightDescriptor`],
/// as used in FlightSQL.
fn msg_from_descriptor(flight_descriptor: FlightDescriptor) -> Result<Any> {
match flight_descriptor.r#type() {
DescriptorType::Cmd => {
let msg: Any = Message::decode(flight_descriptor.cmd).context(DeserializationSnafu)?;
Ok(msg)
}
DescriptorType::Path => Err(Error::unsupported_message_type("FlightInfo with Path")),
DescriptorType::Unknown => Err(Error::unsupported_message_type(
"FlightInfo of unknown type",
)),
}
}

View File

@ -1,11 +1,11 @@
//! Ticket handling for the native IOx Flight API
use arrow_flight::Ticket;
use arrow_flight::{sql::Any, Ticket};
use bytes::Bytes;
use generated_types::influxdata::iox::querier::v1 as proto;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
use observability_deps::tracing::trace;
use prost::Message;
use prost::{DecodeError, Message};
use serde::Deserialize;
use snafu::Snafu;
use std::fmt::{Debug, Display, Formatter};
@ -17,24 +17,27 @@ pub enum Error {
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Flight requests to the IOx Flight DoGet endpoint contain a
/// serialized `Ticket` which describes the request.
/// This is the structure of the opaque tickets` used for requests to
/// IOx Flight DoGet endpoint
///
/// This structure encapsulates the deserialization (and eventual
/// serializing) logic for these requests
#[derive(Debug, PartialEq, Clone, Eq)]
/// This structure encapsulates the deserialization and serializion
/// logic for these requests
#[derive(Debug, PartialEq, Clone)]
pub struct IoxGetRequest {
namespace_name: String,
query: RunQuery,
}
#[derive(Debug, PartialEq, Clone, Eq)]
#[derive(Debug, PartialEq, Clone)]
pub enum RunQuery {
/// Unparameterized SQL query
Sql(String),
/// InfluxQL
InfluxQL(String),
// Coming Soon (Prepared Statement support)
/// Execute a FlightSQL command. The payload is an encoded
/// FlightSql Command*. message that was received at the
/// get_flight_info endpoint
FlightSql(Any),
}
impl Display for RunQuery {
@ -42,6 +45,7 @@ impl Display for RunQuery {
match self {
Self::Sql(s) => Display::fmt(s, f),
Self::InfluxQL(s) => Display::fmt(s, f),
Self::FlightSql(s) => write!(f, "FlightSql({})", s.type_url),
}
}
}
@ -58,11 +62,11 @@ impl IoxGetRequest {
/// try to decode a ReadInfo structure from a Token
pub fn try_decode(ticket: Ticket) -> Result<Self> {
// decode ticket
IoxGetRequest::decode_protobuf(&ticket.ticket)
IoxGetRequest::decode_protobuf(ticket.ticket.clone())
.or_else(|e| {
trace!(%e, ticket=%String::from_utf8_lossy(&ticket.ticket),
"Error decoding ticket as ProtoBuf, trying as JSON");
IoxGetRequest::decode_json(&ticket.ticket)
IoxGetRequest::decode_json(ticket.ticket.clone())
})
.map_err(|e| {
trace!(%e, "Error decoding ticket as JSON");
@ -82,15 +86,21 @@ impl IoxGetRequest {
namespace_name,
sql_query,
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
},
RunQuery::InfluxQL(influxql) => proto::ReadInfo {
namespace_name,
// field name is misleading
sql_query: influxql,
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
},
RunQuery::FlightSql(flightsql_command) => proto::ReadInfo {
namespace_name,
sql_query: "".into(),
query_type: QueryType::FlightSqlMessage.into(),
flightsql_command: flightsql_command.encode_to_vec(),
},
RunQuery::InfluxQL(influxql) => {
proto::ReadInfo {
namespace_name,
// field name is misleading
sql_query: influxql,
query_type: QueryType::InfluxQl.into(),
}
}
};
let ticket = read_info.encode_to_vec();
@ -107,7 +117,7 @@ impl IoxGetRequest {
///
/// Go clients are unable to execute InfluxQL queries until the JSON structure is updated
/// accordingly.
fn decode_json(ticket: &[u8]) -> Result<Self, String> {
fn decode_json(ticket: Bytes) -> Result<Self, String> {
let json_str = String::from_utf8(ticket.to_vec()).map_err(|_| "Not UTF8".to_string())?;
#[derive(Deserialize, Debug)]
@ -128,21 +138,45 @@ impl IoxGetRequest {
})
}
fn decode_protobuf(ticket: &[u8]) -> Result<Self, prost::DecodeError> {
let read_info = proto::ReadInfo::decode(Bytes::from(ticket.to_vec()))?;
fn decode_protobuf(ticket: Bytes) -> Result<Self, DecodeError> {
let read_info = proto::ReadInfo::decode(ticket)?;
let query_type = read_info.query_type();
let proto::ReadInfo {
namespace_name,
sql_query,
query_type: _,
flightsql_command,
} = read_info;
Ok(Self {
namespace_name,
query: match query_type {
QueryType::Unspecified | QueryType::Sql => RunQuery::Sql(sql_query),
QueryType::InfluxQl => RunQuery::InfluxQL(sql_query),
QueryType::Unspecified | QueryType::Sql => {
if !flightsql_command.is_empty() {
return Err(DecodeError::new(
"QueryType::Sql contained non empty flightsql_command",
));
}
RunQuery::Sql(sql_query)
}
QueryType::InfluxQl => {
if !flightsql_command.is_empty() {
return Err(DecodeError::new(
"QueryType::InfluxQl contained non empty flightsql_command",
));
}
RunQuery::InfluxQL(sql_query)
}
QueryType::FlightSqlMessage => {
if !sql_query.is_empty() {
return Err(DecodeError::new(
"QueryType::FlightSqlMessage contained non empty sql_query",
));
}
let cmd = prost::Message::decode(flightsql_command.as_ref())?;
RunQuery::FlightSql(cmd)
}
},
})
}
@ -158,6 +192,7 @@ impl IoxGetRequest {
#[cfg(test)]
mod tests {
use arrow_flight::sql::CommandStatementQuery;
use assert_matches::assert_matches;
use generated_types::influxdata::iox::querier::v1::read_info::QueryType;
@ -192,6 +227,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Unspecified.into(),
flightsql_command: vec![],
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
@ -206,6 +242,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
flightsql_command: vec![],
});
let ri = IoxGetRequest::try_decode(ticket).unwrap();
@ -219,6 +256,7 @@ mod tests {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
flightsql_command: vec![],
});
let ri = IoxGetRequest::try_decode(ticket).unwrap();
@ -231,7 +269,8 @@ mod tests {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".into(),
query_type: 3, // not a known query type
query_type: 42, // not a known query type
flightsql_command: vec![],
});
// Reverts to default (unspecified) for invalid query_type enumeration, and thus SQL
@ -240,6 +279,48 @@ mod tests {
assert_matches!(ri.query, RunQuery::Sql(query) => assert_eq!(query, "SELECT 1"));
}
#[test]
fn proto_ticket_decoding_sql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::Sql.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_influxql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::InfluxQl.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_flightsql_too_many_fields() {
let ticket = make_proto_ticket(&proto::ReadInfo {
namespace_name: "<foo>_<bar>".to_string(),
sql_query: "SELECT 1".to_string(),
query_type: QueryType::FlightSqlMessage.into(),
// can't have both sql_query and flightsql
flightsql_command: vec![1, 2, 3],
});
let e = IoxGetRequest::try_decode(ticket).unwrap_err();
assert_matches!(e, Error::Invalid);
}
#[test]
fn proto_ticket_decoding_error() {
let ticket = Ticket {
@ -279,6 +360,25 @@ mod tests {
assert_eq!(request, roundtripped)
}
#[test]
fn round_trip_flightsql() {
let cmd = Any::pack(&CommandStatementQuery {
query: "select * from foo".into(),
})
.unwrap();
let request = IoxGetRequest {
namespace_name: "foo_blarg".into(),
query: RunQuery::FlightSql(cmd),
};
let ticket = request.clone().try_encode().expect("encoding failed");
let roundtripped = IoxGetRequest::try_decode(ticket).expect("decode failed");
assert_eq!(request, roundtripped)
}
fn make_proto_ticket(read_info: &proto::ReadInfo) -> Ticket {
Ticket {
ticket: read_info.encode_to_vec().into(),