refactor(flightsql): Use upstream `SqlInfoData` implementation (#8453)

* refactor(flightsql): Use upstream implementation

* fix: naming

* fix: better API
pull/24376/head
Andrew Lamb 2023-08-09 12:06:43 -05:00 committed by GitHub
parent 7a462754e4
commit d26850390f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 487 deletions

View File

@ -29,7 +29,7 @@ use observability_deps::tracing::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use prost::Message; use prost::Message;
use crate::{error::*, sql_info::iox_sql_info_list, xdbc_type_info::xdbc_type_info_data}; use crate::{error::*, sql_info::iox_sql_info_data, xdbc_type_info::xdbc_type_info_data};
use crate::{FlightSQLCommand, PreparedStatementHandle}; use crate::{FlightSQLCommand, PreparedStatementHandle};
/// Logic for creating plans for various Flight messages against a query database /// Logic for creating plans for various Flight messages against a query database
@ -58,7 +58,7 @@ impl FlightSQLPlanner {
get_schema_for_query(handle.query(), ctx).await get_schema_for_query(handle.query(), ctx).await
} }
FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { .. }) => { FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { .. }) => {
Ok(iox_sql_info_list().schema()) Ok(iox_sql_info_data().schema())
} }
FlightSQLCommand::CommandGetCatalogs(req) => Ok(req.into_builder().schema()), FlightSQLCommand::CommandGetCatalogs(req) => Ok(req.into_builder().schema()),
FlightSQLCommand::CommandGetCrossReference(CommandGetCrossReference { .. }) => { FlightSQLCommand::CommandGetCrossReference(CommandGetCrossReference { .. }) => {
@ -110,9 +110,9 @@ impl FlightSQLPlanner {
debug!(%query, "Planning FlightSQL prepared query"); debug!(%query, "Planning FlightSQL prepared query");
Ok(ctx.sql_to_physical_plan(query).await?) Ok(ctx.sql_to_physical_plan(query).await?)
} }
FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { info }) => { FlightSQLCommand::CommandGetSqlInfo(cmd) => {
debug!("Planning GetSqlInfo query"); debug!(?cmd, "Planning GetSqlInfo query");
let plan = plan_get_sql_info(ctx, info).await?; let plan = plan_get_sql_info(ctx, cmd).await?;
Ok(ctx.create_physical_plan(&plan).await?) Ok(ctx.create_physical_plan(&plan).await?)
} }
FlightSQLCommand::CommandGetCatalogs(cmd) => { FlightSQLCommand::CommandGetCatalogs(cmd) => {
@ -307,10 +307,8 @@ fn encode_schema(schema: &Schema) -> Result<Bytes> {
} }
/// Return a `LogicalPlan` for GetSqlInfo /// Return a `LogicalPlan` for GetSqlInfo
/// async fn plan_get_sql_info(ctx: &IOxSessionContext, cmd: CommandGetSqlInfo) -> Result<LogicalPlan> {
/// The infos are passed directly from the [`CommandGetSqlInfo::info`] let batch = cmd.into_builder(iox_sql_info_data()).build()?;
async fn plan_get_sql_info(ctx: &IOxSessionContext, info: Vec<u32>) -> Result<LogicalPlan> {
let batch = iox_sql_info_list().filter(&info).encode()?;
Ok(ctx.batch_to_logical_plan(batch)?) Ok(ctx.batch_to_logical_plan(batch)?)
} }

View File

@ -1,6 +1,3 @@
//! TODO: use version in flight-sql when
//! <https://github.com/apache/arrow-rs/pull/4266> is available
//!
//! Represents the response to FlightSQL `GetSqlInfo` requests and //! Represents the response to FlightSQL `GetSqlInfo` requests and
//! handles the conversion to/from the format specified in the //! handles the conversion to/from the format specified in the
//! [Arrow FlightSQL Specification]. //! [Arrow FlightSQL Specification].
@ -22,17 +19,9 @@
//! [Arrow FlightSQL Specification]: https://github.com/apache/arrow/blob/f1eece9f276184063c9c35011e8243eb3b071233/format/FlightSql.proto#L33-L42 //! [Arrow FlightSQL Specification]: https://github.com/apache/arrow/blob/f1eece9f276184063c9c35011e8243eb3b071233/format/FlightSql.proto#L33-L42
mod meta; mod meta;
mod value;
use crate::error::Result;
use std::{borrow::Cow, collections::BTreeMap, sync::Arc};
use arrow::{
array::UInt32Builder,
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_flight::sql::{ use arrow_flight::sql::{
metadata::{SqlInfoData, SqlInfoDataBuilder},
SqlInfo, SqlNullOrdering, SqlSupportedCaseSensitivity, SqlSupportedTransactions, SqlInfo, SqlNullOrdering, SqlSupportedCaseSensitivity, SqlSupportedTransactions,
SupportedSqlGrammar, SupportedSqlGrammar,
}; };
@ -42,82 +31,9 @@ use meta::{
SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS, SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS,
SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS, SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS,
}; };
use value::{SqlInfoName, SqlInfoUnionBuilder, SqlInfoValue};
/// A list of SQL info names and valies
#[derive(Debug, Clone, PartialEq)]
pub struct SqlInfoList {
/// Use BTreeMap to ensure the values are sorted by value as
/// to make output consistent
///
/// Use u32 to support "custom" sql info values that are not
/// part of the SqlInfo enum
infos: BTreeMap<u32, SqlInfoValue>,
}
impl SqlInfoList {
pub fn new() -> Self {
Self {
infos: BTreeMap::new(),
}
}
/// register the specific sql metadata item
fn with_sql_info(mut self, name: impl SqlInfoName, value: impl Into<SqlInfoValue>) -> Self {
self.infos.insert(name.as_u32(), value.into());
self
}
/// Filter this info list keeping only the info values specified
/// in `infos`.
///
/// Returns self if infos is empty (no filtering)
pub fn filter(&self, info: &[u32]) -> Cow<'_, Self> {
if info.is_empty() {
Cow::Borrowed(self)
} else {
let infos: BTreeMap<_, _> = info
.iter()
.filter_map(|name| self.infos.get(name).map(|v| (*name, v.clone())))
.collect();
Cow::Owned(Self { infos })
}
}
/// Encode the contents of this info list according to the FlightSQL spec
pub fn encode(&self) -> Result<RecordBatch> {
let mut name_builder = UInt32Builder::new();
let mut value_builder = SqlInfoUnionBuilder::new();
for (&name, value) in &self.infos {
name_builder.append_value(name);
value_builder.append_value(value)
}
let batch = RecordBatch::try_from_iter(vec![
("info_name", Arc::new(name_builder.finish()) as _),
("value", Arc::new(value_builder.finish()) as _),
])?;
Ok(batch)
}
/// Return the schema for the record batches produced
pub fn schema(&self) -> SchemaRef {
// It is always the same
Arc::clone(&SCHEMA)
}
}
// The schema produced by [`SqlInfoList`]
static SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
Arc::new(Schema::new(vec![
Field::new("info_name", DataType::UInt32, false),
Field::new("value", SqlInfoUnionBuilder::schema().clone(), false),
]))
});
#[allow(non_snake_case)] #[allow(non_snake_case)]
static INSTANCE: Lazy<SqlInfoList> = Lazy::new(|| { static INSTANCE: Lazy<SqlInfoData> = Lazy::new(|| {
// The following are not defined in the [`SqlInfo`], but are // The following are not defined in the [`SqlInfo`], but are
// documented at // documented at
// https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions. // https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions.
@ -133,164 +49,135 @@ static INSTANCE: Lazy<SqlInfoList> = Lazy::new(|| {
// Copied from https://github.com/influxdata/idpe/blob/85aa7a52b40f173cc4d79ac02b3a4a13e82333c4/queryrouter/internal/server/flightsql_handler.go#L208-L275 // Copied from https://github.com/influxdata/idpe/blob/85aa7a52b40f173cc4d79ac02b3a4a13e82333c4/queryrouter/internal/server/flightsql_handler.go#L208-L275
SqlInfoList::new() let mut builder = SqlInfoDataBuilder::new();
// Server information
.with_sql_info(SqlInfo::FlightSqlServerName, "InfluxDB IOx") // Server information
.with_sql_info(SqlInfo::FlightSqlServerVersion, "2") builder.append(SqlInfo::FlightSqlServerName, "InfluxDB IOx");
// 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24 builder.append(SqlInfo::FlightSqlServerVersion, "2");
.with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3") // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24
.with_sql_info(SqlInfo::FlightSqlServerReadOnly, true) builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3");
.with_sql_info(SqlInfoFlightSqlServerSql, true) builder.append(SqlInfo::FlightSqlServerReadOnly, true);
.with_sql_info(SqlInfoFlightSqlServerSubstrait, false) builder.append(SqlInfoFlightSqlServerSql, true);
.with_sql_info( builder.append(SqlInfoFlightSqlServerSubstrait, false);
SqlInfoFlightSqlServerTransaction, builder.append(
SqlSupportedTransactions::SqlTransactionUnspecified as i32, SqlInfoFlightSqlServerTransaction,
) SqlSupportedTransactions::SqlTransactionUnspecified as i32,
// don't yetsupport `CancelQuery` action );
.with_sql_info(SqlInfoFlightSqlServerCancel, false) // don't yetsupport `CancelQuery` action
.with_sql_info(SqlInfoFlightSqlServerStatementTimeout, 0i32) builder.append(SqlInfoFlightSqlServerCancel, false);
.with_sql_info(SqlInfoFlightSqlServerTransactionTimeout, 0i32) builder.append(SqlInfoFlightSqlServerStatementTimeout, 0i32);
// SQL syntax information builder.append(SqlInfoFlightSqlServerTransactionTimeout, 0i32);
.with_sql_info(SqlInfo::SqlDdlCatalog, false) // SQL syntax information
.with_sql_info(SqlInfo::SqlDdlSchema, false) builder.append(SqlInfo::SqlDdlCatalog, false);
.with_sql_info(SqlInfo::SqlDdlTable, false) builder.append(SqlInfo::SqlDdlSchema, false);
.with_sql_info( builder.append(SqlInfo::SqlDdlTable, false);
SqlInfo::SqlIdentifierCase, builder.append(
SqlSupportedCaseSensitivity::SqlCaseSensitivityLowercase as i32, SqlInfo::SqlIdentifierCase,
) SqlSupportedCaseSensitivity::SqlCaseSensitivityLowercase as i32,
.with_sql_info(SqlInfo::SqlIdentifierQuoteChar, r#"""#) );
.with_sql_info( builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#);
SqlInfo::SqlQuotedIdentifierCase, builder.append(
SqlSupportedCaseSensitivity::SqlCaseSensitivityCaseInsensitive as i32, SqlInfo::SqlQuotedIdentifierCase,
) SqlSupportedCaseSensitivity::SqlCaseSensitivityCaseInsensitive as i32,
.with_sql_info(SqlInfo::SqlAllTablesAreSelectable, true) );
.with_sql_info( builder.append(SqlInfo::SqlAllTablesAreSelectable, true);
SqlInfo::SqlNullOrdering, builder.append(
SqlNullOrdering::SqlNullsSortedHigh as i32, SqlInfo::SqlNullOrdering,
) SqlNullOrdering::SqlNullsSortedHigh as i32,
.with_sql_info(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS) );
.with_sql_info(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS) builder.append(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS);
.with_sql_info(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS) builder.append(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS);
.with_sql_info(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS) builder.append(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS);
.with_sql_info(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS) builder.append(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS);
.with_sql_info(SqlInfo::SqlSearchStringEscape, "\\") builder.append(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS);
.with_sql_info(SqlInfo::SqlExtraNameCharacters, "") builder.append(SqlInfo::SqlSearchStringEscape, "\\");
.with_sql_info(SqlInfo::SqlSupportsColumnAliasing, true) builder.append(SqlInfo::SqlExtraNameCharacters, "");
.with_sql_info(SqlInfo::SqlNullPlusNullIsNull, true) builder.append(SqlInfo::SqlSupportsColumnAliasing, true);
// Skip SqlSupportsConvert (which is the map of the conversions that are supported) builder.append(SqlInfo::SqlNullPlusNullIsNull, true);
// .with_sql_info(SqlInfo::SqlSupportsConvert, TBD) // Skip SqlSupportsConvert (which is the map of the conversions that are supported);
// https://github.com/influxdata/influxdb_iox/issues/7253 // .with_sql_info(SqlInfo::SqlSupportsConvert, TBD);
.with_sql_info(SqlInfo::SqlSupportsTableCorrelationNames, false) // https://github.com/influxdata/influxdb_iox/issues/7253
.with_sql_info(SqlInfo::SqlSupportsDifferentTableCorrelationNames, false) builder.append(SqlInfo::SqlSupportsTableCorrelationNames, false);
.with_sql_info(SqlInfo::SqlSupportsExpressionsInOrderBy, true) builder.append(SqlInfo::SqlSupportsDifferentTableCorrelationNames, false);
.with_sql_info(SqlInfo::SqlSupportsOrderByUnrelated, true) builder.append(SqlInfo::SqlSupportsExpressionsInOrderBy, true);
.with_sql_info(SqlInfo::SqlSupportedGroupBy, 3i32) builder.append(SqlInfo::SqlSupportsOrderByUnrelated, true);
.with_sql_info(SqlInfo::SqlSupportsLikeEscapeClause, true) builder.append(SqlInfo::SqlSupportedGroupBy, 3i32);
.with_sql_info(SqlInfo::SqlSupportsNonNullableColumns, true) builder.append(SqlInfo::SqlSupportsLikeEscapeClause, true);
.with_sql_info( builder.append(SqlInfo::SqlSupportsNonNullableColumns, true);
SqlInfo::SqlSupportedGrammar, builder.append(
SupportedSqlGrammar::SqlCoreGrammar as i32, SqlInfo::SqlSupportedGrammar,
) SupportedSqlGrammar::SqlCoreGrammar as i32,
// report IOx supports all ansi 92 );
.with_sql_info(SqlInfo::SqlAnsi92SupportedLevel, 0b111_i32) // report IOx supports all ansi 92
.with_sql_info(SqlInfo::SqlSupportsIntegrityEnhancementFacility, false) builder.append(SqlInfo::SqlAnsi92SupportedLevel, 0b111_i32);
.with_sql_info(SqlInfo::SqlOuterJoinsSupportLevel, 2i32) builder.append(SqlInfo::SqlSupportsIntegrityEnhancementFacility, false);
.with_sql_info(SqlInfo::SqlSchemaTerm, "schema") builder.append(SqlInfo::SqlOuterJoinsSupportLevel, 2i32);
.with_sql_info(SqlInfo::SqlProcedureTerm, "procedure") builder.append(SqlInfo::SqlSchemaTerm, "schema");
.with_sql_info(SqlInfo::SqlCatalogAtStart, false) builder.append(SqlInfo::SqlProcedureTerm, "procedure");
.with_sql_info(SqlInfo::SqlSchemasSupportedActions, 0i32) builder.append(SqlInfo::SqlCatalogAtStart, false);
.with_sql_info(SqlInfo::SqlCatalogsSupportedActions, 0i32) builder.append(SqlInfo::SqlSchemasSupportedActions, 0i32);
.with_sql_info(SqlInfo::SqlSupportedPositionedCommands, 0i32) builder.append(SqlInfo::SqlCatalogsSupportedActions, 0i32);
.with_sql_info(SqlInfo::SqlSelectForUpdateSupported, false) builder.append(SqlInfo::SqlSupportedPositionedCommands, 0i32);
.with_sql_info(SqlInfo::SqlStoredProceduresSupported, false) builder.append(SqlInfo::SqlSelectForUpdateSupported, false);
.with_sql_info(SqlInfo::SqlSupportedSubqueries, 15i32) builder.append(SqlInfo::SqlStoredProceduresSupported, false);
.with_sql_info(SqlInfo::SqlCorrelatedSubqueriesSupported, true) builder.append(SqlInfo::SqlSupportedSubqueries, 15i32);
.with_sql_info(SqlInfo::SqlSupportedUnions, 3i32) builder.append(SqlInfo::SqlCorrelatedSubqueriesSupported, true);
// For max lengths, report max arrow string length (IOx builder.append(SqlInfo::SqlSupportedUnions, 3i32);
// doesn't enfore many of these limits yet // For max lengths, report max arrow string length (IOx
.with_sql_info(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64) // doesn't enfore many of these limits yet
.with_sql_info(SqlInfo::SqlMaxCharLiteralLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxCharLiteralLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnsInGroupBy, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnsInIndex, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnsInGroupBy, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnsInOrderBy, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnsInIndex, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnsInSelect, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnsInOrderBy, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxColumnsInTable, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnsInSelect, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxConnections, i32::MAX as i64) builder.append(SqlInfo::SqlMaxColumnsInTable, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxCursorNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxConnections, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxIndexLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxCursorNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlDbSchemaNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxIndexLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxProcedureNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlDbSchemaNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxCatalogNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxProcedureNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxRowSize, i32::MAX as i64) builder.append(SqlInfo::SqlMaxCatalogNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxRowSizeIncludesBlobs, true) builder.append(SqlInfo::SqlMaxRowSize, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxStatementLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxRowSizeIncludesBlobs, true);
.with_sql_info(SqlInfo::SqlMaxStatements, i32::MAX as i64) builder.append(SqlInfo::SqlMaxStatementLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxTableNameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxStatements, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxTablesInSelect, i32::MAX as i64) builder.append(SqlInfo::SqlMaxTableNameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlMaxUsernameLength, i32::MAX as i64) builder.append(SqlInfo::SqlMaxTablesInSelect, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlDefaultTransactionIsolation, 0i64) builder.append(SqlInfo::SqlMaxUsernameLength, i32::MAX as i64);
.with_sql_info(SqlInfo::SqlTransactionsSupported, false) builder.append(SqlInfo::SqlDefaultTransactionIsolation, 0i64);
.with_sql_info(SqlInfo::SqlSupportedTransactionsIsolationLevels, 0i32) builder.append(SqlInfo::SqlTransactionsSupported, false);
.with_sql_info(SqlInfo::SqlDataDefinitionCausesTransactionCommit, false) builder.append(SqlInfo::SqlSupportedTransactionsIsolationLevels, 0i32);
.with_sql_info(SqlInfo::SqlDataDefinitionsInTransactionsIgnored, true) builder.append(SqlInfo::SqlDataDefinitionCausesTransactionCommit, false);
.with_sql_info(SqlInfo::SqlSupportedResultSetTypes, 0i32) builder.append(SqlInfo::SqlDataDefinitionsInTransactionsIgnored, true);
.with_sql_info( builder.append(SqlInfo::SqlSupportedResultSetTypes, 0i32);
SqlInfo::SqlSupportedConcurrenciesForResultSetUnspecified, builder.append(
0i32, SqlInfo::SqlSupportedConcurrenciesForResultSetUnspecified,
) 0i32,
.with_sql_info( );
SqlInfo::SqlSupportedConcurrenciesForResultSetForwardOnly, builder.append(
0i32, SqlInfo::SqlSupportedConcurrenciesForResultSetForwardOnly,
) 0i32,
.with_sql_info( );
SqlInfo::SqlSupportedConcurrenciesForResultSetScrollSensitive, builder.append(
0i32, SqlInfo::SqlSupportedConcurrenciesForResultSetScrollSensitive,
) 0i32,
.with_sql_info( );
SqlInfo::SqlSupportedConcurrenciesForResultSetScrollInsensitive, builder.append(
0i32, SqlInfo::SqlSupportedConcurrenciesForResultSetScrollInsensitive,
) 0i32,
.with_sql_info(SqlInfo::SqlBatchUpdatesSupported, false) );
.with_sql_info(SqlInfo::SqlSavepointsSupported, false) builder.append(SqlInfo::SqlBatchUpdatesSupported, false);
.with_sql_info(SqlInfo::SqlNamedParametersSupported, false) builder.append(SqlInfo::SqlSavepointsSupported, false);
.with_sql_info(SqlInfo::SqlLocatorsUpdateCopy, false) builder.append(SqlInfo::SqlNamedParametersSupported, false);
.with_sql_info(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false) builder.append(SqlInfo::SqlLocatorsUpdateCopy, false);
builder.append(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false);
builder.build().expect("Successfully built metadata")
}); });
/// Return a [`SqlInfoList`] that describes IOx's capablities /// Return a [`SqlInfoData`] that describes IOx's capablities
pub fn iox_sql_info_list() -> &'static SqlInfoList { pub fn iox_sql_info_data() -> &'static SqlInfoData {
&INSTANCE &INSTANCE
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn filter_empty() {
let filter = &[];
assert_eq!(
iox_sql_info_list(),
iox_sql_info_list().filter(filter).as_ref()
)
}
#[test]
fn filter_some() {
let filter = &[
SqlInfo::FlightSqlServerName as u32,
SqlInfo::FlightSqlServerArrowVersion as u32,
SqlInfo::SqlBatchUpdatesSupported as u32,
999999, // model some unknown info requested
];
let result = iox_sql_info_list().filter(filter);
let infos = &result.infos;
assert_eq!(result.infos.len(), 3);
assert!(infos.contains_key(&(SqlInfo::FlightSqlServerName as u32)));
assert!(infos.contains_key(&(SqlInfo::FlightSqlServerArrowVersion as u32)));
assert!(infos.contains_key(&(SqlInfo::SqlBatchUpdatesSupported as u32)));
assert!(!infos.contains_key(&999999));
}
}

View File

@ -1,235 +0,0 @@
//! Dynamic value support for SqlInfo
use std::sync::Arc;
use arrow::{
array::{
Array, ArrayBuilder, ArrayData, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder,
ListBuilder, StringBuilder, UnionArray,
},
datatypes::{DataType, Field, UnionFields, UnionMode},
};
use arrow_flight::sql::SqlInfo;
use once_cell::sync::Lazy;
/// Represents a dynamic value
#[derive(Debug, Clone, PartialEq)]
pub enum SqlInfoValue {
String(String),
Bool(bool),
BigInt(i64),
Bitmask(i32),
StringList(Vec<String>),
// TODO support more exotic metadata that requires the map of lists
//ListMap(BTreeMap<i32, Vec<i32>>),
}
impl From<&str> for SqlInfoValue {
fn from(value: &str) -> Self {
Self::String(value.to_string())
}
}
impl From<bool> for SqlInfoValue {
fn from(value: bool) -> Self {
Self::Bool(value)
}
}
impl From<i32> for SqlInfoValue {
fn from(value: i32) -> Self {
Self::Bitmask(value)
}
}
impl From<i64> for SqlInfoValue {
fn from(value: i64) -> Self {
Self::BigInt(value)
}
}
impl From<&[&str]> for SqlInfoValue {
fn from(values: &[&str]) -> Self {
let values = values.iter().map(|s| s.to_string()).collect();
Self::StringList(values)
}
}
/// Something that can be converted into u32 (the represenation of a
/// [`SqlInfo`] name)
pub trait SqlInfoName {
fn as_u32(&self) -> u32;
}
impl SqlInfoName for SqlInfo {
fn as_u32(&self) -> u32 {
// SqlInfos are u32 in the flight spec, but for some reason
// SqlInfo repr is an i32, so convert between them
u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32")
}
}
// Allow passing u32 directly into to with_sql_info
impl SqlInfoName for u32 {
fn as_u32(&self) -> u32 {
*self
}
}
/// Handles creating the dense [`UnionArray`] described by [flightsql]
///
///
/// NOT YET COMPLETE: The int32_to_int32_list_map
///
/// ```text
/// * value: dense_union<
/// * string_value: utf8,
/// * bool_value: bool,
/// * bigint_value: int64,
/// * int32_bitmask: int32,
/// * string_list: list<string_data: utf8>
/// * int32_to_int32_list_map: map<key: int32, value: list<$data$: int32>>
/// * >
/// ```
///[flightsql]: (https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43
pub struct SqlInfoUnionBuilder {
// Values for each child type
string_values: StringBuilder,
bool_values: BooleanBuilder,
bigint_values: Int64Builder,
int32_bitmask_values: Int32Builder,
string_list_values: ListBuilder<StringBuilder>,
/// incrementally build types/offset of the dense union,
///
/// See [Union Spec] for details.
///
/// [Union Spec]: https://arrow.apache.org/docs/format/Columnar.html#dense-union
type_ids: Int8Builder,
offsets: Int32Builder,
}
/// [`DataType`] for the output union array
static UNION_TYPE: Lazy<DataType> = Lazy::new(|| {
let nullable = false;
let fields = vec![
Field::new("string_value", DataType::Utf8, nullable),
Field::new("bool_value", DataType::Boolean, nullable),
Field::new("bigint_value", DataType::Int64, nullable),
Field::new("int32_bitmask", DataType::Int32, nullable),
// treat list as nullable b/c that is what hte builders make
Field::new(
"string_list",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
true,
),
];
// create "type ids", one for each type
// assume they go from 0 .. num_fields
let type_ids: Vec<i8> = (0..fields.len()).map(|v| v as i8).collect();
DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense)
});
impl SqlInfoUnionBuilder {
pub fn new() -> Self {
Self {
string_values: StringBuilder::new(),
bool_values: BooleanBuilder::new(),
bigint_values: Int64Builder::new(),
int32_bitmask_values: Int32Builder::new(),
string_list_values: ListBuilder::new(StringBuilder::new()),
type_ids: Int8Builder::new(),
offsets: Int32Builder::new(),
}
}
/// Returns the DataType created by this builder
pub fn schema() -> &'static DataType {
&UNION_TYPE
}
/// Append the specified value to this builder
pub fn append_value(&mut self, v: &SqlInfoValue) {
// typeid is which child and len is the child array's length
// *after* adding the value
let (type_id, len) = match v {
SqlInfoValue::String(v) => {
self.string_values.append_value(v);
(0, self.string_values.len())
}
SqlInfoValue::Bool(v) => {
self.bool_values.append_value(*v);
(1, self.bool_values.len())
}
SqlInfoValue::BigInt(v) => {
self.bigint_values.append_value(*v);
(2, self.bigint_values.len())
}
SqlInfoValue::Bitmask(v) => {
self.int32_bitmask_values.append_value(*v);
(3, self.int32_bitmask_values.len())
}
SqlInfoValue::StringList(values) => {
// build list
for v in values {
self.string_list_values.values().append_value(v);
}
// complete the list
self.string_list_values.append(true);
(4, self.string_list_values.len())
}
};
self.type_ids.append_value(type_id);
let len = i32::try_from(len).expect("offset fit in i32");
self.offsets.append_value(len - 1);
}
/// Complete the construction and build the [`UnionArray`]
pub fn finish(self) -> UnionArray {
let Self {
mut string_values,
mut bool_values,
mut bigint_values,
mut int32_bitmask_values,
mut string_list_values,
mut type_ids,
mut offsets,
} = self;
let type_ids = type_ids.finish();
let offsets = offsets.finish();
// form the correct ArrayData
let len = offsets.len();
let null_bit_buffer = None;
let offset = 0;
let buffers = vec![
type_ids.into_data().buffers()[0].clone(),
offsets.into_data().buffers()[0].clone(),
];
let child_data = vec![
string_values.finish().into_data(),
bool_values.finish().into_data(),
bigint_values.finish().into_data(),
int32_bitmask_values.finish().into_data(),
string_list_values.finish().into_data(),
];
let data = ArrayData::try_new(
UNION_TYPE.clone(),
len,
null_bit_buffer,
offset,
buffers,
child_data,
)
.expect("Correctly created UnionArray");
UnionArray::from(data)
}
}