diff --git a/flightsql/src/planner.rs b/flightsql/src/planner.rs index e2ffa0179c..b30aefd6ef 100644 --- a/flightsql/src/planner.rs +++ b/flightsql/src/planner.rs @@ -29,7 +29,7 @@ use observability_deps::tracing::debug; use once_cell::sync::Lazy; use prost::Message; -use crate::{error::*, sql_info::iox_sql_info_list, xdbc_type_info::xdbc_type_info_data}; +use crate::{error::*, sql_info::iox_sql_info_data, xdbc_type_info::xdbc_type_info_data}; use crate::{FlightSQLCommand, PreparedStatementHandle}; /// Logic for creating plans for various Flight messages against a query database @@ -58,7 +58,7 @@ impl FlightSQLPlanner { get_schema_for_query(handle.query(), ctx).await } FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { .. }) => { - Ok(iox_sql_info_list().schema()) + Ok(iox_sql_info_data().schema()) } FlightSQLCommand::CommandGetCatalogs(req) => Ok(req.into_builder().schema()), FlightSQLCommand::CommandGetCrossReference(CommandGetCrossReference { .. }) => { @@ -110,9 +110,9 @@ impl FlightSQLPlanner { debug!(%query, "Planning FlightSQL prepared query"); Ok(ctx.sql_to_physical_plan(query).await?) } - FlightSQLCommand::CommandGetSqlInfo(CommandGetSqlInfo { info }) => { - debug!("Planning GetSqlInfo query"); - let plan = plan_get_sql_info(ctx, info).await?; + FlightSQLCommand::CommandGetSqlInfo(cmd) => { + debug!(?cmd, "Planning GetSqlInfo query"); + let plan = plan_get_sql_info(ctx, cmd).await?; Ok(ctx.create_physical_plan(&plan).await?) } FlightSQLCommand::CommandGetCatalogs(cmd) => { @@ -307,10 +307,8 @@ fn encode_schema(schema: &Schema) -> Result { } /// Return a `LogicalPlan` for GetSqlInfo -/// -/// The infos are passed directly from the [`CommandGetSqlInfo::info`] -async fn plan_get_sql_info(ctx: &IOxSessionContext, info: Vec) -> Result { - let batch = iox_sql_info_list().filter(&info).encode()?; +async fn plan_get_sql_info(ctx: &IOxSessionContext, cmd: CommandGetSqlInfo) -> Result { + let batch = cmd.into_builder(iox_sql_info_data()).build()?; Ok(ctx.batch_to_logical_plan(batch)?) } diff --git a/flightsql/src/sql_info/mod.rs b/flightsql/src/sql_info/mod.rs index 77179a635a..6ef906d170 100644 --- a/flightsql/src/sql_info/mod.rs +++ b/flightsql/src/sql_info/mod.rs @@ -1,6 +1,3 @@ -//! TODO: use version in flight-sql when -//! is available -//! //! Represents the response to FlightSQL `GetSqlInfo` requests and //! handles the conversion to/from the format specified in the //! [Arrow FlightSQL Specification]. @@ -22,17 +19,9 @@ //! [Arrow FlightSQL Specification]: https://github.com/apache/arrow/blob/f1eece9f276184063c9c35011e8243eb3b071233/format/FlightSql.proto#L33-L42 mod meta; -mod value; -use crate::error::Result; -use std::{borrow::Cow, collections::BTreeMap, sync::Arc}; - -use arrow::{ - array::UInt32Builder, - datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, -}; use arrow_flight::sql::{ + metadata::{SqlInfoData, SqlInfoDataBuilder}, SqlInfo, SqlNullOrdering, SqlSupportedCaseSensitivity, SqlSupportedTransactions, SupportedSqlGrammar, }; @@ -42,82 +31,9 @@ use meta::{ SQL_INFO_DATE_TIME_FUNCTIONS, SQL_INFO_NUMERIC_FUNCTIONS, SQL_INFO_SQL_KEYWORDS, SQL_INFO_STRING_FUNCTIONS, SQL_INFO_SYSTEM_FUNCTIONS, }; -use value::{SqlInfoName, SqlInfoUnionBuilder, SqlInfoValue}; - -/// A list of SQL info names and valies -#[derive(Debug, Clone, PartialEq)] -pub struct SqlInfoList { - /// Use BTreeMap to ensure the values are sorted by value as - /// to make output consistent - /// - /// Use u32 to support "custom" sql info values that are not - /// part of the SqlInfo enum - infos: BTreeMap, -} - -impl SqlInfoList { - pub fn new() -> Self { - Self { - infos: BTreeMap::new(), - } - } - - /// register the specific sql metadata item - fn with_sql_info(mut self, name: impl SqlInfoName, value: impl Into) -> Self { - self.infos.insert(name.as_u32(), value.into()); - self - } - - /// Filter this info list keeping only the info values specified - /// in `infos`. - /// - /// Returns self if infos is empty (no filtering) - pub fn filter(&self, info: &[u32]) -> Cow<'_, Self> { - if info.is_empty() { - Cow::Borrowed(self) - } else { - let infos: BTreeMap<_, _> = info - .iter() - .filter_map(|name| self.infos.get(name).map(|v| (*name, v.clone()))) - .collect(); - Cow::Owned(Self { infos }) - } - } - - /// Encode the contents of this info list according to the FlightSQL spec - pub fn encode(&self) -> Result { - let mut name_builder = UInt32Builder::new(); - let mut value_builder = SqlInfoUnionBuilder::new(); - - for (&name, value) in &self.infos { - name_builder.append_value(name); - value_builder.append_value(value) - } - - let batch = RecordBatch::try_from_iter(vec![ - ("info_name", Arc::new(name_builder.finish()) as _), - ("value", Arc::new(value_builder.finish()) as _), - ])?; - Ok(batch) - } - - /// Return the schema for the record batches produced - pub fn schema(&self) -> SchemaRef { - // It is always the same - Arc::clone(&SCHEMA) - } -} - -// The schema produced by [`SqlInfoList`] -static SCHEMA: Lazy = Lazy::new(|| { - Arc::new(Schema::new(vec![ - Field::new("info_name", DataType::UInt32, false), - Field::new("value", SqlInfoUnionBuilder::schema().clone(), false), - ])) -}); #[allow(non_snake_case)] -static INSTANCE: Lazy = Lazy::new(|| { +static INSTANCE: Lazy = Lazy::new(|| { // The following are not defined in the [`SqlInfo`], but are // documented at // https://arrow.apache.org/docs/format/FlightSql.html#protocol-buffer-definitions. @@ -133,164 +49,135 @@ static INSTANCE: Lazy = Lazy::new(|| { // Copied from https://github.com/influxdata/idpe/blob/85aa7a52b40f173cc4d79ac02b3a4a13e82333c4/queryrouter/internal/server/flightsql_handler.go#L208-L275 - SqlInfoList::new() - // Server information - .with_sql_info(SqlInfo::FlightSqlServerName, "InfluxDB IOx") - .with_sql_info(SqlInfo::FlightSqlServerVersion, "2") - // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24 - .with_sql_info(SqlInfo::FlightSqlServerArrowVersion, "1.3") - .with_sql_info(SqlInfo::FlightSqlServerReadOnly, true) - .with_sql_info(SqlInfoFlightSqlServerSql, true) - .with_sql_info(SqlInfoFlightSqlServerSubstrait, false) - .with_sql_info( - SqlInfoFlightSqlServerTransaction, - SqlSupportedTransactions::SqlTransactionUnspecified as i32, - ) - // don't yetsupport `CancelQuery` action - .with_sql_info(SqlInfoFlightSqlServerCancel, false) - .with_sql_info(SqlInfoFlightSqlServerStatementTimeout, 0i32) - .with_sql_info(SqlInfoFlightSqlServerTransactionTimeout, 0i32) - // SQL syntax information - .with_sql_info(SqlInfo::SqlDdlCatalog, false) - .with_sql_info(SqlInfo::SqlDdlSchema, false) - .with_sql_info(SqlInfo::SqlDdlTable, false) - .with_sql_info( - SqlInfo::SqlIdentifierCase, - SqlSupportedCaseSensitivity::SqlCaseSensitivityLowercase as i32, - ) - .with_sql_info(SqlInfo::SqlIdentifierQuoteChar, r#"""#) - .with_sql_info( - SqlInfo::SqlQuotedIdentifierCase, - SqlSupportedCaseSensitivity::SqlCaseSensitivityCaseInsensitive as i32, - ) - .with_sql_info(SqlInfo::SqlAllTablesAreSelectable, true) - .with_sql_info( - SqlInfo::SqlNullOrdering, - SqlNullOrdering::SqlNullsSortedHigh as i32, - ) - .with_sql_info(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS) - .with_sql_info(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS) - .with_sql_info(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS) - .with_sql_info(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS) - .with_sql_info(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS) - .with_sql_info(SqlInfo::SqlSearchStringEscape, "\\") - .with_sql_info(SqlInfo::SqlExtraNameCharacters, "") - .with_sql_info(SqlInfo::SqlSupportsColumnAliasing, true) - .with_sql_info(SqlInfo::SqlNullPlusNullIsNull, true) - // Skip SqlSupportsConvert (which is the map of the conversions that are supported) - // .with_sql_info(SqlInfo::SqlSupportsConvert, TBD) - // https://github.com/influxdata/influxdb_iox/issues/7253 - .with_sql_info(SqlInfo::SqlSupportsTableCorrelationNames, false) - .with_sql_info(SqlInfo::SqlSupportsDifferentTableCorrelationNames, false) - .with_sql_info(SqlInfo::SqlSupportsExpressionsInOrderBy, true) - .with_sql_info(SqlInfo::SqlSupportsOrderByUnrelated, true) - .with_sql_info(SqlInfo::SqlSupportedGroupBy, 3i32) - .with_sql_info(SqlInfo::SqlSupportsLikeEscapeClause, true) - .with_sql_info(SqlInfo::SqlSupportsNonNullableColumns, true) - .with_sql_info( - SqlInfo::SqlSupportedGrammar, - SupportedSqlGrammar::SqlCoreGrammar as i32, - ) - // report IOx supports all ansi 92 - .with_sql_info(SqlInfo::SqlAnsi92SupportedLevel, 0b111_i32) - .with_sql_info(SqlInfo::SqlSupportsIntegrityEnhancementFacility, false) - .with_sql_info(SqlInfo::SqlOuterJoinsSupportLevel, 2i32) - .with_sql_info(SqlInfo::SqlSchemaTerm, "schema") - .with_sql_info(SqlInfo::SqlProcedureTerm, "procedure") - .with_sql_info(SqlInfo::SqlCatalogAtStart, false) - .with_sql_info(SqlInfo::SqlSchemasSupportedActions, 0i32) - .with_sql_info(SqlInfo::SqlCatalogsSupportedActions, 0i32) - .with_sql_info(SqlInfo::SqlSupportedPositionedCommands, 0i32) - .with_sql_info(SqlInfo::SqlSelectForUpdateSupported, false) - .with_sql_info(SqlInfo::SqlStoredProceduresSupported, false) - .with_sql_info(SqlInfo::SqlSupportedSubqueries, 15i32) - .with_sql_info(SqlInfo::SqlCorrelatedSubqueriesSupported, true) - .with_sql_info(SqlInfo::SqlSupportedUnions, 3i32) - // For max lengths, report max arrow string length (IOx - // doesn't enfore many of these limits yet - .with_sql_info(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxCharLiteralLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnsInGroupBy, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnsInIndex, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnsInOrderBy, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnsInSelect, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxColumnsInTable, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxConnections, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxCursorNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxIndexLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlDbSchemaNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxProcedureNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxCatalogNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxRowSize, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxRowSizeIncludesBlobs, true) - .with_sql_info(SqlInfo::SqlMaxStatementLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxStatements, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxTableNameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxTablesInSelect, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlMaxUsernameLength, i32::MAX as i64) - .with_sql_info(SqlInfo::SqlDefaultTransactionIsolation, 0i64) - .with_sql_info(SqlInfo::SqlTransactionsSupported, false) - .with_sql_info(SqlInfo::SqlSupportedTransactionsIsolationLevels, 0i32) - .with_sql_info(SqlInfo::SqlDataDefinitionCausesTransactionCommit, false) - .with_sql_info(SqlInfo::SqlDataDefinitionsInTransactionsIgnored, true) - .with_sql_info(SqlInfo::SqlSupportedResultSetTypes, 0i32) - .with_sql_info( - SqlInfo::SqlSupportedConcurrenciesForResultSetUnspecified, - 0i32, - ) - .with_sql_info( - SqlInfo::SqlSupportedConcurrenciesForResultSetForwardOnly, - 0i32, - ) - .with_sql_info( - SqlInfo::SqlSupportedConcurrenciesForResultSetScrollSensitive, - 0i32, - ) - .with_sql_info( - SqlInfo::SqlSupportedConcurrenciesForResultSetScrollInsensitive, - 0i32, - ) - .with_sql_info(SqlInfo::SqlBatchUpdatesSupported, false) - .with_sql_info(SqlInfo::SqlSavepointsSupported, false) - .with_sql_info(SqlInfo::SqlNamedParametersSupported, false) - .with_sql_info(SqlInfo::SqlLocatorsUpdateCopy, false) - .with_sql_info(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false) + let mut builder = SqlInfoDataBuilder::new(); + + // Server information + builder.append(SqlInfo::FlightSqlServerName, "InfluxDB IOx"); + builder.append(SqlInfo::FlightSqlServerVersion, "2"); + // 1.3 comes from https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/Schema.fbs#L24 + builder.append(SqlInfo::FlightSqlServerArrowVersion, "1.3"); + builder.append(SqlInfo::FlightSqlServerReadOnly, true); + builder.append(SqlInfoFlightSqlServerSql, true); + builder.append(SqlInfoFlightSqlServerSubstrait, false); + builder.append( + SqlInfoFlightSqlServerTransaction, + SqlSupportedTransactions::SqlTransactionUnspecified as i32, + ); + // don't yetsupport `CancelQuery` action + builder.append(SqlInfoFlightSqlServerCancel, false); + builder.append(SqlInfoFlightSqlServerStatementTimeout, 0i32); + builder.append(SqlInfoFlightSqlServerTransactionTimeout, 0i32); + // SQL syntax information + builder.append(SqlInfo::SqlDdlCatalog, false); + builder.append(SqlInfo::SqlDdlSchema, false); + builder.append(SqlInfo::SqlDdlTable, false); + builder.append( + SqlInfo::SqlIdentifierCase, + SqlSupportedCaseSensitivity::SqlCaseSensitivityLowercase as i32, + ); + builder.append(SqlInfo::SqlIdentifierQuoteChar, r#"""#); + builder.append( + SqlInfo::SqlQuotedIdentifierCase, + SqlSupportedCaseSensitivity::SqlCaseSensitivityCaseInsensitive as i32, + ); + builder.append(SqlInfo::SqlAllTablesAreSelectable, true); + builder.append( + SqlInfo::SqlNullOrdering, + SqlNullOrdering::SqlNullsSortedHigh as i32, + ); + builder.append(SqlInfo::SqlKeywords, SQL_INFO_SQL_KEYWORDS); + builder.append(SqlInfo::SqlNumericFunctions, SQL_INFO_NUMERIC_FUNCTIONS); + builder.append(SqlInfo::SqlStringFunctions, SQL_INFO_STRING_FUNCTIONS); + builder.append(SqlInfo::SqlSystemFunctions, SQL_INFO_SYSTEM_FUNCTIONS); + builder.append(SqlInfo::SqlDatetimeFunctions, SQL_INFO_DATE_TIME_FUNCTIONS); + builder.append(SqlInfo::SqlSearchStringEscape, "\\"); + builder.append(SqlInfo::SqlExtraNameCharacters, ""); + builder.append(SqlInfo::SqlSupportsColumnAliasing, true); + builder.append(SqlInfo::SqlNullPlusNullIsNull, true); + // Skip SqlSupportsConvert (which is the map of the conversions that are supported); + // .with_sql_info(SqlInfo::SqlSupportsConvert, TBD); + // https://github.com/influxdata/influxdb_iox/issues/7253 + builder.append(SqlInfo::SqlSupportsTableCorrelationNames, false); + builder.append(SqlInfo::SqlSupportsDifferentTableCorrelationNames, false); + builder.append(SqlInfo::SqlSupportsExpressionsInOrderBy, true); + builder.append(SqlInfo::SqlSupportsOrderByUnrelated, true); + builder.append(SqlInfo::SqlSupportedGroupBy, 3i32); + builder.append(SqlInfo::SqlSupportsLikeEscapeClause, true); + builder.append(SqlInfo::SqlSupportsNonNullableColumns, true); + builder.append( + SqlInfo::SqlSupportedGrammar, + SupportedSqlGrammar::SqlCoreGrammar as i32, + ); + // report IOx supports all ansi 92 + builder.append(SqlInfo::SqlAnsi92SupportedLevel, 0b111_i32); + builder.append(SqlInfo::SqlSupportsIntegrityEnhancementFacility, false); + builder.append(SqlInfo::SqlOuterJoinsSupportLevel, 2i32); + builder.append(SqlInfo::SqlSchemaTerm, "schema"); + builder.append(SqlInfo::SqlProcedureTerm, "procedure"); + builder.append(SqlInfo::SqlCatalogAtStart, false); + builder.append(SqlInfo::SqlSchemasSupportedActions, 0i32); + builder.append(SqlInfo::SqlCatalogsSupportedActions, 0i32); + builder.append(SqlInfo::SqlSupportedPositionedCommands, 0i32); + builder.append(SqlInfo::SqlSelectForUpdateSupported, false); + builder.append(SqlInfo::SqlStoredProceduresSupported, false); + builder.append(SqlInfo::SqlSupportedSubqueries, 15i32); + builder.append(SqlInfo::SqlCorrelatedSubqueriesSupported, true); + builder.append(SqlInfo::SqlSupportedUnions, 3i32); + // For max lengths, report max arrow string length (IOx + // doesn't enfore many of these limits yet + builder.append(SqlInfo::SqlMaxBinaryLiteralLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxCharLiteralLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnsInGroupBy, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnsInIndex, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnsInOrderBy, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnsInSelect, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxColumnsInTable, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxConnections, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxCursorNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxIndexLength, i32::MAX as i64); + builder.append(SqlInfo::SqlDbSchemaNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxProcedureNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxCatalogNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxRowSize, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxRowSizeIncludesBlobs, true); + builder.append(SqlInfo::SqlMaxStatementLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxStatements, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxTableNameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxTablesInSelect, i32::MAX as i64); + builder.append(SqlInfo::SqlMaxUsernameLength, i32::MAX as i64); + builder.append(SqlInfo::SqlDefaultTransactionIsolation, 0i64); + builder.append(SqlInfo::SqlTransactionsSupported, false); + builder.append(SqlInfo::SqlSupportedTransactionsIsolationLevels, 0i32); + builder.append(SqlInfo::SqlDataDefinitionCausesTransactionCommit, false); + builder.append(SqlInfo::SqlDataDefinitionsInTransactionsIgnored, true); + builder.append(SqlInfo::SqlSupportedResultSetTypes, 0i32); + builder.append( + SqlInfo::SqlSupportedConcurrenciesForResultSetUnspecified, + 0i32, + ); + builder.append( + SqlInfo::SqlSupportedConcurrenciesForResultSetForwardOnly, + 0i32, + ); + builder.append( + SqlInfo::SqlSupportedConcurrenciesForResultSetScrollSensitive, + 0i32, + ); + builder.append( + SqlInfo::SqlSupportedConcurrenciesForResultSetScrollInsensitive, + 0i32, + ); + builder.append(SqlInfo::SqlBatchUpdatesSupported, false); + builder.append(SqlInfo::SqlSavepointsSupported, false); + builder.append(SqlInfo::SqlNamedParametersSupported, false); + builder.append(SqlInfo::SqlLocatorsUpdateCopy, false); + builder.append(SqlInfo::SqlStoredFunctionsUsingCallSyntaxSupported, false); + + builder.build().expect("Successfully built metadata") }); -/// Return a [`SqlInfoList`] that describes IOx's capablities -pub fn iox_sql_info_list() -> &'static SqlInfoList { +/// Return a [`SqlInfoData`] that describes IOx's capablities +pub fn iox_sql_info_data() -> &'static SqlInfoData { &INSTANCE } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn filter_empty() { - let filter = &[]; - assert_eq!( - iox_sql_info_list(), - iox_sql_info_list().filter(filter).as_ref() - ) - } - - #[test] - fn filter_some() { - let filter = &[ - SqlInfo::FlightSqlServerName as u32, - SqlInfo::FlightSqlServerArrowVersion as u32, - SqlInfo::SqlBatchUpdatesSupported as u32, - 999999, // model some unknown info requested - ]; - let result = iox_sql_info_list().filter(filter); - - let infos = &result.infos; - assert_eq!(result.infos.len(), 3); - assert!(infos.contains_key(&(SqlInfo::FlightSqlServerName as u32))); - assert!(infos.contains_key(&(SqlInfo::FlightSqlServerArrowVersion as u32))); - assert!(infos.contains_key(&(SqlInfo::SqlBatchUpdatesSupported as u32))); - assert!(!infos.contains_key(&999999)); - } -} diff --git a/flightsql/src/sql_info/value.rs b/flightsql/src/sql_info/value.rs deleted file mode 100644 index 5bd9c1d04f..0000000000 --- a/flightsql/src/sql_info/value.rs +++ /dev/null @@ -1,235 +0,0 @@ -//! Dynamic value support for SqlInfo - -use std::sync::Arc; - -use arrow::{ - array::{ - Array, ArrayBuilder, ArrayData, BooleanBuilder, Int32Builder, Int64Builder, Int8Builder, - ListBuilder, StringBuilder, UnionArray, - }, - datatypes::{DataType, Field, UnionFields, UnionMode}, -}; -use arrow_flight::sql::SqlInfo; -use once_cell::sync::Lazy; - -/// Represents a dynamic value -#[derive(Debug, Clone, PartialEq)] -pub enum SqlInfoValue { - String(String), - Bool(bool), - BigInt(i64), - Bitmask(i32), - StringList(Vec), - // TODO support more exotic metadata that requires the map of lists - //ListMap(BTreeMap>), -} - -impl From<&str> for SqlInfoValue { - fn from(value: &str) -> Self { - Self::String(value.to_string()) - } -} - -impl From for SqlInfoValue { - fn from(value: bool) -> Self { - Self::Bool(value) - } -} - -impl From for SqlInfoValue { - fn from(value: i32) -> Self { - Self::Bitmask(value) - } -} - -impl From for SqlInfoValue { - fn from(value: i64) -> Self { - Self::BigInt(value) - } -} - -impl From<&[&str]> for SqlInfoValue { - fn from(values: &[&str]) -> Self { - let values = values.iter().map(|s| s.to_string()).collect(); - Self::StringList(values) - } -} - -/// Something that can be converted into u32 (the represenation of a -/// [`SqlInfo`] name) -pub trait SqlInfoName { - fn as_u32(&self) -> u32; -} - -impl SqlInfoName for SqlInfo { - fn as_u32(&self) -> u32 { - // SqlInfos are u32 in the flight spec, but for some reason - // SqlInfo repr is an i32, so convert between them - u32::try_from(i32::from(*self)).expect("SqlInfo fit into u32") - } -} - -// Allow passing u32 directly into to with_sql_info -impl SqlInfoName for u32 { - fn as_u32(&self) -> u32 { - *self - } -} - -/// Handles creating the dense [`UnionArray`] described by [flightsql] -/// -/// -/// NOT YET COMPLETE: The int32_to_int32_list_map -/// -/// ```text -/// * value: dense_union< -/// * string_value: utf8, -/// * bool_value: bool, -/// * bigint_value: int64, -/// * int32_bitmask: int32, -/// * string_list: list -/// * int32_to_int32_list_map: map> -/// * > -/// ``` -///[flightsql]: (https://github.com/apache/arrow/blob/f9324b79bf4fc1ec7e97b32e3cce16e75ef0f5e3/format/FlightSql.proto#L32-L43 -pub struct SqlInfoUnionBuilder { - // Values for each child type - string_values: StringBuilder, - bool_values: BooleanBuilder, - bigint_values: Int64Builder, - int32_bitmask_values: Int32Builder, - string_list_values: ListBuilder, - - /// incrementally build types/offset of the dense union, - /// - /// See [Union Spec] for details. - /// - /// [Union Spec]: https://arrow.apache.org/docs/format/Columnar.html#dense-union - type_ids: Int8Builder, - offsets: Int32Builder, -} - -/// [`DataType`] for the output union array -static UNION_TYPE: Lazy = Lazy::new(|| { - let nullable = false; - let fields = vec![ - Field::new("string_value", DataType::Utf8, nullable), - Field::new("bool_value", DataType::Boolean, nullable), - Field::new("bigint_value", DataType::Int64, nullable), - Field::new("int32_bitmask", DataType::Int32, nullable), - // treat list as nullable b/c that is what hte builders make - Field::new( - "string_list", - DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))), - true, - ), - ]; - - // create "type ids", one for each type - // assume they go from 0 .. num_fields - let type_ids: Vec = (0..fields.len()).map(|v| v as i8).collect(); - - DataType::Union(UnionFields::new(type_ids, fields), UnionMode::Dense) -}); - -impl SqlInfoUnionBuilder { - pub fn new() -> Self { - Self { - string_values: StringBuilder::new(), - bool_values: BooleanBuilder::new(), - bigint_values: Int64Builder::new(), - int32_bitmask_values: Int32Builder::new(), - string_list_values: ListBuilder::new(StringBuilder::new()), - type_ids: Int8Builder::new(), - offsets: Int32Builder::new(), - } - } - - /// Returns the DataType created by this builder - pub fn schema() -> &'static DataType { - &UNION_TYPE - } - - /// Append the specified value to this builder - pub fn append_value(&mut self, v: &SqlInfoValue) { - // typeid is which child and len is the child array's length - // *after* adding the value - let (type_id, len) = match v { - SqlInfoValue::String(v) => { - self.string_values.append_value(v); - (0, self.string_values.len()) - } - SqlInfoValue::Bool(v) => { - self.bool_values.append_value(*v); - (1, self.bool_values.len()) - } - SqlInfoValue::BigInt(v) => { - self.bigint_values.append_value(*v); - (2, self.bigint_values.len()) - } - SqlInfoValue::Bitmask(v) => { - self.int32_bitmask_values.append_value(*v); - (3, self.int32_bitmask_values.len()) - } - SqlInfoValue::StringList(values) => { - // build list - for v in values { - self.string_list_values.values().append_value(v); - } - // complete the list - self.string_list_values.append(true); - (4, self.string_list_values.len()) - } - }; - - self.type_ids.append_value(type_id); - let len = i32::try_from(len).expect("offset fit in i32"); - self.offsets.append_value(len - 1); - } - - /// Complete the construction and build the [`UnionArray`] - pub fn finish(self) -> UnionArray { - let Self { - mut string_values, - mut bool_values, - mut bigint_values, - mut int32_bitmask_values, - mut string_list_values, - mut type_ids, - mut offsets, - } = self; - let type_ids = type_ids.finish(); - let offsets = offsets.finish(); - - // form the correct ArrayData - - let len = offsets.len(); - let null_bit_buffer = None; - let offset = 0; - - let buffers = vec![ - type_ids.into_data().buffers()[0].clone(), - offsets.into_data().buffers()[0].clone(), - ]; - - let child_data = vec![ - string_values.finish().into_data(), - bool_values.finish().into_data(), - bigint_values.finish().into_data(), - int32_bitmask_values.finish().into_data(), - string_list_values.finish().into_data(), - ]; - - let data = ArrayData::try_new( - UNION_TYPE.clone(), - len, - null_bit_buffer, - offset, - buffers, - child_data, - ) - .expect("Correctly created UnionArray"); - - UnionArray::from(data) - } -}