feat(flightsql): support `database`, `bucket`, and `bucket-name` as grpc header names (#7511)
* feat: support `database`, `bucket`, and `bucket-name` as grpc header names * chore: lint * chore: update doc to accept `database`, `bucket`, and `bucket-name` as parameter names * chore: update doc to only show `database` as the parameter name * refactor: consolidate header names into a const vec and update comments on database --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
171113b922
commit
fb6b3f66da
|
@ -2,19 +2,19 @@
|
|||
|
||||
InfluxDB IOx supports running SQL queries via [Apache Arrow Flight SQL](https://arrow.apache.org/docs/format/FlightSql.html)
|
||||
|
||||
You can use either a native FlightSQL client as well as JDBC / ODBC Flight SQL drivers
|
||||
You can use either a native FlightSQL client as well as JDBC / ODBC Flight SQL drivers
|
||||
|
||||
## JDBC:
|
||||
|
||||
To use the JDBC driver with IOx:
|
||||
|
||||
1. Download the driver by following the link from [Maven](https://mvnrepository.com/artifact/org.apache.arrow/flight-sql/10.0.1) or [Dremio](https://www.dremio.com/drivers/jdbc/)
|
||||
2. Use a jdbc conection of the format: `jdbc:arrow-flight-sql://hostname:port?useEncryption=false&iox-namespace-name=NAME`.
|
||||
2. Use a jdbc conection of the format: `jdbc:arrow-flight-sql://hostname:port?useEncryption=false&database=NAME`
|
||||
|
||||
`hostname:port` is the host / port on which the IOx query gRPC API is running (default port is 8082), and `NAME` is the namespace name (for example, `26f7e5a4b7be365b_917b97a92e883afc`)
|
||||
`hostname:port` is the host / port on which the IOx query gRPC API is running (default port is 8082), and `NAME` is the database name (for example, `26f7e5a4b7be365b_917b97a92e883afc`)
|
||||
|
||||
An example JDBC URL is:
|
||||
|
||||
```
|
||||
jdbc:arrow-flight-sql://localhost:8082?useEncryption=false&iox-namespace-name=26f7e5a4b7be365b_917b97a92e883afc
|
||||
jdbc:arrow-flight-sql://localhost:8082?useEncryption=false&database=26f7e5a4b7be365b_917b97a92e883afc
|
||||
```
|
||||
|
|
|
@ -1357,8 +1357,153 @@ async fn authz() {
|
|||
authz.close().await;
|
||||
}
|
||||
|
||||
/// Ensure that FligthSQL API supports the following grpc header names,
|
||||
/// in addition to the existing `iox-namespace-name`
|
||||
/// 1. database
|
||||
/// 2. bucket
|
||||
/// 3. bucket-name
|
||||
#[tokio::test]
|
||||
async fn flightsql_client_header_name_database() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = flightsql_client_helper(state.cluster(), "database");
|
||||
|
||||
let stream = client.get_table_types().await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +------------+
|
||||
- "| table_type |"
|
||||
- +------------+
|
||||
- "| BASE TABLE |"
|
||||
- "| VIEW |"
|
||||
- +------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flightsql_client_header_name_bucket() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = flightsql_client_helper(state.cluster(), "bucket");
|
||||
|
||||
let stream = client.get_table_types().await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +------------+
|
||||
- "| table_type |"
|
||||
- +------------+
|
||||
- "| BASE TABLE |"
|
||||
- "| VIEW |"
|
||||
- +------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn flightsql_client_header_name_bucket_name() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let database_url = maybe_skip_integration!();
|
||||
|
||||
let table_name = "the_table";
|
||||
|
||||
// Set up the cluster ====================================
|
||||
let mut cluster = MiniCluster::create_shared2(database_url).await;
|
||||
|
||||
StepTest::new(
|
||||
&mut cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{table_name},tag1=A,tag2=B val=42i 123456\n\
|
||||
{table_name},tag1=A,tag2=C val=43i 123457"
|
||||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
let mut client = flightsql_client_helper(state.cluster(), "bucket-name");
|
||||
|
||||
let stream = client.get_table_types().await.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
@r###"
|
||||
---
|
||||
- +------------+
|
||||
- "| table_type |"
|
||||
- +------------+
|
||||
- "| BASE TABLE |"
|
||||
- "| VIEW |"
|
||||
- +------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
],
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
/// Return a [`FlightSqlClient`] configured for use
|
||||
fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
|
||||
flightsql_client_helper(cluster, "iox-namespace-name")
|
||||
}
|
||||
|
||||
/// Helper function for fn `flightsql_client` that returns a [`FlightSqlClient`] configured for use
|
||||
fn flightsql_client_helper(cluster: &MiniCluster, header_name: &str) -> FlightSqlClient {
|
||||
let connection = cluster.querier().querier_grpc_connection();
|
||||
let (channel, _headers) = connection.into_grpc_connection().into_parts();
|
||||
|
||||
|
@ -1366,7 +1511,7 @@ fn flightsql_client(cluster: &MiniCluster) -> FlightSqlClient {
|
|||
|
||||
// Add namespace to client headers until it is fully supported by FlightSQL
|
||||
let namespace = cluster.namespace();
|
||||
client.add_header("iox-namespace-name", namespace).unwrap();
|
||||
client.add_header(header_name, namespace).unwrap();
|
||||
|
||||
client
|
||||
}
|
||||
|
|
|
@ -10,14 +10,15 @@ influxdb_iox -v
|
|||
|
||||
## Run the JDBC test
|
||||
|
||||
To run the JDBC test program, specify the target namespace in the JDBC URL:
|
||||
To run the JDBC test program, specify the target database in the JDBC URL:
|
||||
|
||||
```shell
|
||||
# run the jdbc client driver program, downloading the JDBC driver if needed
|
||||
./jdbc_client "jdbc:arrow-flight-sql://localhost:8082?useEncryption=false&iox-namespace-name=26f7e5a4b7be365b_917b97a92e883afc" query 'select * from cpu'
|
||||
./jdbc_client "jdbc:arrow-flight-sql://localhost:8082?useEncryption=false&database=26f7e5a4b7be365b_917b97a92e883afc" query 'select * from cpu'
|
||||
```
|
||||
|
||||
# Cleanup:
|
||||
|
||||
Clean up any intermediate files (like JDBC driver)
|
||||
|
||||
```shell
|
||||
|
|
|
@ -37,12 +37,17 @@ use trace::{ctx::SpanContext, span::SpanExt};
|
|||
use trace_http::ctx::{RequestLogContext, RequestLogContextExt};
|
||||
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
|
||||
|
||||
/// The name of the grpc header that contains the target iox namespace
|
||||
/// name for FlightSQL requests.
|
||||
/// The supported names of the grpc header that contain the target database
|
||||
/// for FlightSQL requests.
|
||||
///
|
||||
/// See <https://lists.apache.org/thread/fd6r1n7vt91sg2c7fr35wcrsqz6x4645>
|
||||
/// for discussion on adding support to FlightSQL itself.
|
||||
const IOX_FLIGHT_SQL_NAMESPACE_HEADER: &str = "iox-namespace-name";
|
||||
const IOX_FLIGHT_SQL_DATABASE_HEADERS: [&str; 4] = [
|
||||
"database", // preferred
|
||||
"bucket",
|
||||
"bucket-name",
|
||||
"iox-namespace-name", // deprecated
|
||||
];
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -69,9 +74,24 @@ pub enum Error {
|
|||
source: DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"More than one headers are found in request: {:?}. \
|
||||
Please include only one of them",
|
||||
header_names
|
||||
))]
|
||||
TooManyFlightSQLDatabases { header_names: Vec<String> },
|
||||
|
||||
#[snafu(display("no 'iox-namespace-name' header in request"))]
|
||||
NoFlightSQLNamespace,
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid 'database' or 'bucket' or 'bucket-name' header in request: {}",
|
||||
source
|
||||
))]
|
||||
InvalidDatabaseHeader {
|
||||
source: tonic::metadata::errors::ToStrError,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid 'iox-namespace-name' header in request: {}", source))]
|
||||
InvalidNamespaceHeader {
|
||||
source: tonic::metadata::errors::ToStrError,
|
||||
|
@ -125,7 +145,9 @@ impl From<Error> for tonic::Status {
|
|||
| Error::InvalidNamespaceName { .. } => info!(e=%err, msg),
|
||||
Error::Query { .. } => info!(e=%err, msg),
|
||||
Error::Optimize { .. }
|
||||
| Error::TooManyFlightSQLDatabases { .. }
|
||||
| Error::NoFlightSQLNamespace
|
||||
| Error::InvalidDatabaseHeader { .. }
|
||||
| Error::InvalidNamespaceHeader { .. }
|
||||
| Error::Planning { .. }
|
||||
| Error::Deserialization { .. }
|
||||
|
@ -152,7 +174,9 @@ impl Error {
|
|||
Self::InvalidTicket { .. }
|
||||
| Self::InvalidHandshake { .. }
|
||||
| Self::Deserialization { .. }
|
||||
| Self::TooManyFlightSQLDatabases { .. }
|
||||
| Self::NoFlightSQLNamespace
|
||||
| Self::InvalidDatabaseHeader { .. }
|
||||
| Self::InvalidNamespaceHeader { .. }
|
||||
| Self::InvalidNamespaceName { .. } => tonic::Code::InvalidArgument,
|
||||
Self::Planning { source, .. } | Self::Query { source, .. } => {
|
||||
|
@ -709,10 +733,41 @@ fn cmd_from_descriptor(flight_descriptor: FlightDescriptor) -> Result<FlightSQLC
|
|||
}
|
||||
}
|
||||
|
||||
/// Figure out the namespace for this request by checking
|
||||
/// the "iox-namespace-name=the_name";
|
||||
/// Figure out the database for this request by checking
|
||||
/// the "database=database_or_bucket_name" (preferred)
|
||||
/// or "bucket=database_or_bucket_name"
|
||||
/// or "bucket-name=database_or_bucket_name"
|
||||
/// or "iox-namespace-name=the_name" (deprecated);
|
||||
///
|
||||
/// Only one of the keys is accepted.
|
||||
///
|
||||
/// Note that `iox-namespace-name` is still accepted (rather than error) for
|
||||
/// some period of time until we are sure that all other software speaking
|
||||
/// FlightSQL is using the new header names.
|
||||
fn get_flightsql_namespace(metadata: &MetadataMap) -> Result<String> {
|
||||
if let Some(v) = metadata.get(IOX_FLIGHT_SQL_NAMESPACE_HEADER) {
|
||||
let mut found_header_keys: Vec<String> = vec![];
|
||||
for key in IOX_FLIGHT_SQL_DATABASE_HEADERS {
|
||||
if metadata.contains_key(key) {
|
||||
found_header_keys.push(key.to_string());
|
||||
}
|
||||
}
|
||||
if found_header_keys.len() > 1 {
|
||||
return TooManyFlightSQLDatabasesSnafu {
|
||||
header_names: found_header_keys,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
if let Some(v) = metadata.get(IOX_FLIGHT_SQL_DATABASE_HEADERS[0]) {
|
||||
let v = v.to_str().context(InvalidDatabaseHeaderSnafu)?;
|
||||
return Ok(v.to_string());
|
||||
} else if let Some(v) = metadata.get(IOX_FLIGHT_SQL_DATABASE_HEADERS[1]) {
|
||||
let v = v.to_str().context(InvalidDatabaseHeaderSnafu)?;
|
||||
return Ok(v.to_string());
|
||||
} else if let Some(v) = metadata.get(IOX_FLIGHT_SQL_DATABASE_HEADERS[2]) {
|
||||
let v = v.to_str().context(InvalidDatabaseHeaderSnafu)?;
|
||||
return Ok(v.to_string());
|
||||
} else if let Some(v) = metadata.get(IOX_FLIGHT_SQL_DATABASE_HEADERS[3]) {
|
||||
let v = v.to_str().context(InvalidNamespaceHeaderSnafu)?;
|
||||
return Ok(v.to_string());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue