feat(flightsql): Add filters to `GetTables` metadata endpoint (#7289)
* feat: add optional param to GetTables * chore: add the third param to query plan * feat: add table_types param * chore: clippy * test: add test cases with filters * chore: update query to avoid SQL injection * refactor: update where clause and cleanup --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
6172e6c513
commit
87708dc64a
|
@ -123,7 +123,14 @@ impl FlightSQLPlanner {
|
|||
table_types,
|
||||
include_schema,
|
||||
}) => {
|
||||
debug!("Planning GetTables query");
|
||||
debug!(
|
||||
?catalog,
|
||||
?db_schema_filter_pattern,
|
||||
?table_name_filter_pattern,
|
||||
?table_types,
|
||||
?include_schema,
|
||||
"Planning GetTables query"
|
||||
);
|
||||
let plan = plan_get_tables(
|
||||
ctx,
|
||||
catalog,
|
||||
|
@ -302,18 +309,62 @@ static GET_DB_SCHEMAS_SCHEMA: Lazy<Schema> = Lazy::new(|| {
|
|||
/// Return a `LogicalPlan` for GetTables
|
||||
async fn plan_get_tables(
|
||||
ctx: &IOxSessionContext,
|
||||
_catalog: Option<String>,
|
||||
_db_schema_filter_pattern: Option<String>,
|
||||
_table_name_filter_pattern: Option<String>,
|
||||
_table_types: Vec<String>,
|
||||
catalog: Option<String>,
|
||||
db_schema_filter_pattern: Option<String>,
|
||||
table_name_filter_pattern: Option<String>,
|
||||
table_types: Vec<String>,
|
||||
_include_schema: bool,
|
||||
) -> Result<LogicalPlan> {
|
||||
let query = "SELECT table_catalog AS catalog_name, \
|
||||
// use '%' to match anything if filters are not specified
|
||||
let catalog = catalog.unwrap_or_else(|| String::from("%"));
|
||||
let db_schema_filter_pattern = db_schema_filter_pattern.unwrap_or_else(|| String::from("%"));
|
||||
let table_name_filter_pattern = table_name_filter_pattern.unwrap_or_else(|| String::from("%"));
|
||||
|
||||
let has_base_tables = table_types.iter().any(|t| t == "BASE TABLE");
|
||||
let has_views = table_types.iter().any(|t| t == "VIEW");
|
||||
|
||||
let where_clause = match (table_types.is_empty(), has_base_tables, has_views) {
|
||||
// user input `table_types` is not in IOx
|
||||
(false, false, false) => "false",
|
||||
(true, _, _) => {
|
||||
// `table_types` is an empty vec![]
|
||||
"table_catalog like $1 AND table_schema like $2 AND table_name like $3"
|
||||
}
|
||||
(false, false, true) => {
|
||||
"table_catalog like $1 AND table_schema like $2 AND table_name like $3 \
|
||||
AND table_type = 'VIEW'"
|
||||
}
|
||||
(false, true, false) => {
|
||||
"table_catalog like $1 AND table_schema like $2 AND table_name like $3 \
|
||||
AND table_type = 'BASE TABLE'"
|
||||
}
|
||||
(false, true, true) => {
|
||||
"table_catalog like $1 AND table_schema like $2 AND table_name like $3 \
|
||||
AND table_type IN ('BASE TABLE', 'VIEW')"
|
||||
}
|
||||
};
|
||||
|
||||
// the WHERE clause is being constructed from known strings (rather than using
|
||||
// `table_types` directly in the SQL) to avoid a SQL injection attack
|
||||
let query = format!(
|
||||
"PREPARE my_plan(VARCHAR, VARCHAR, VARCHAR) AS \
|
||||
SELECT table_catalog AS catalog_name, \
|
||||
table_schema AS db_schema_name, table_name, table_type \
|
||||
FROM information_schema.tables \
|
||||
ORDER BY table_catalog, table_schema, table_name";
|
||||
WHERE {} \
|
||||
ORDER BY table_catalog, table_schema, table_name",
|
||||
where_clause
|
||||
);
|
||||
|
||||
Ok(ctx.sql_to_logical_plan(query).await?)
|
||||
let params = vec![
|
||||
ScalarValue::Utf8(Some(catalog)),
|
||||
ScalarValue::Utf8(Some(db_schema_filter_pattern)),
|
||||
ScalarValue::Utf8(Some(table_name_filter_pattern)),
|
||||
];
|
||||
|
||||
let plan = ctx.sql_to_logical_plan(query.as_str()).await?;
|
||||
debug!(?plan, "Prepared plan is");
|
||||
Ok(plan.with_param_values(params)?)
|
||||
}
|
||||
|
||||
/// The schema for GetTables
|
||||
|
@ -333,6 +384,7 @@ static GET_TABLES_SCHEMA: Lazy<Schema> = Lazy::new(|| {
|
|||
/// entire DataFusion plan.
|
||||
async fn plan_get_table_types(ctx: &IOxSessionContext) -> Result<LogicalPlan> {
|
||||
let query = "SELECT DISTINCT table_type FROM information_schema.tables ORDER BY table_type";
|
||||
|
||||
Ok(ctx.sql_to_logical_plan(query).await?)
|
||||
}
|
||||
|
||||
|
|
|
@ -290,18 +290,98 @@ async fn flightsql_get_tables() {
|
|||
)),
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
struct TestCase {
|
||||
catalog: Option<&'static str>,
|
||||
db_schema_filter_pattern: Option<&'static str>,
|
||||
table_name_filter_pattern: Option<&'static str>,
|
||||
table_types: Vec<String>,
|
||||
include_schema: bool,
|
||||
}
|
||||
let cases = [
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
table_name_filter_pattern: None,
|
||||
table_types: vec![],
|
||||
include_schema: false,
|
||||
},
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
table_name_filter_pattern: None,
|
||||
table_types: vec!["BASE TABLE".to_string()],
|
||||
include_schema: false,
|
||||
},
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
table_name_filter_pattern: None,
|
||||
// BASE <> BASE TABLE
|
||||
table_types: vec!["BASE".to_string()],
|
||||
include_schema: false,
|
||||
},
|
||||
TestCase {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
table_name_filter_pattern: None,
|
||||
table_types: vec!["RANDOM".to_string()],
|
||||
include_schema: false,
|
||||
},
|
||||
TestCase {
|
||||
catalog: Some("public"),
|
||||
db_schema_filter_pattern: Some("information_schema"),
|
||||
table_name_filter_pattern: Some("tables"),
|
||||
table_types: vec!["VIEW".to_string()],
|
||||
include_schema: false,
|
||||
},
|
||||
];
|
||||
|
||||
let mut client = flightsql_client(state.cluster());
|
||||
|
||||
let stream = client
|
||||
.get_tables(Some(""), Some(""), Some(""), None)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
let mut output = vec![];
|
||||
for case in cases {
|
||||
let TestCase {
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
table_name_filter_pattern,
|
||||
table_types,
|
||||
include_schema,
|
||||
} = case;
|
||||
|
||||
output.push(format!("catalog:{catalog:?}"));
|
||||
output.push(format!(
|
||||
"db_schema_filter_pattern:{db_schema_filter_pattern:?}"
|
||||
));
|
||||
output.push(format!(
|
||||
"table_name_filter_pattern:{table_name_filter_pattern:?}"
|
||||
));
|
||||
output.push(format!("table_types:{table_types:?}"));
|
||||
output.push(format!("include_schema:{include_schema:?}"));
|
||||
output.push("*********************".into());
|
||||
|
||||
let stream = client
|
||||
.get_tables(
|
||||
catalog,
|
||||
db_schema_filter_pattern,
|
||||
table_name_filter_pattern,
|
||||
table_types,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = collect_stream(stream).await;
|
||||
output.extend(batches_to_sorted_lines(&batches))
|
||||
}
|
||||
|
||||
insta::assert_yaml_snapshot!(
|
||||
batches_to_sorted_lines(&batches),
|
||||
output,
|
||||
@r###"
|
||||
---
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "table_name_filter_pattern:None"
|
||||
- "table_types:[]"
|
||||
- "include_schema:false"
|
||||
- "*********************"
|
||||
- +--------------+--------------------+-------------+------------+
|
||||
- "| catalog_name | db_schema_name | table_name | table_type |"
|
||||
- +--------------+--------------------+-------------+------------+
|
||||
|
@ -312,6 +392,45 @@ async fn flightsql_get_tables() {
|
|||
- "| public | iox | the_table | BASE TABLE |"
|
||||
- "| public | system | queries | BASE TABLE |"
|
||||
- +--------------+--------------------+-------------+------------+
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "table_name_filter_pattern:None"
|
||||
- "table_types:[\"BASE TABLE\"]"
|
||||
- "include_schema:false"
|
||||
- "*********************"
|
||||
- +--------------+----------------+------------+------------+
|
||||
- "| catalog_name | db_schema_name | table_name | table_type |"
|
||||
- +--------------+----------------+------------+------------+
|
||||
- "| public | iox | the_table | BASE TABLE |"
|
||||
- "| public | system | queries | BASE TABLE |"
|
||||
- +--------------+----------------+------------+------------+
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "table_name_filter_pattern:None"
|
||||
- "table_types:[\"BASE\"]"
|
||||
- "include_schema:false"
|
||||
- "*********************"
|
||||
- ++
|
||||
- ++
|
||||
- "catalog:None"
|
||||
- "db_schema_filter_pattern:None"
|
||||
- "table_name_filter_pattern:None"
|
||||
- "table_types:[\"RANDOM\"]"
|
||||
- "include_schema:false"
|
||||
- "*********************"
|
||||
- ++
|
||||
- ++
|
||||
- "catalog:Some(\"public\")"
|
||||
- "db_schema_filter_pattern:Some(\"information_schema\")"
|
||||
- "table_name_filter_pattern:Some(\"tables\")"
|
||||
- "table_types:[\"VIEW\"]"
|
||||
- "include_schema:false"
|
||||
- "*********************"
|
||||
- +--------------+--------------------+------------+------------+
|
||||
- "| catalog_name | db_schema_name | table_name | table_type |"
|
||||
- +--------------+--------------------+------------+------------+
|
||||
- "| public | information_schema | tables | VIEW |"
|
||||
- +--------------+--------------------+------------+------------+
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -587,8 +706,9 @@ async fn flightsql_jdbc() {
|
|||
information_schema, public\n\
|
||||
iox, public\n\
|
||||
system, public";
|
||||
|
||||
// CommandGetTables output
|
||||
let expected_tables = "**************\n\
|
||||
let expected_tables_no_filter = "**************\n\
|
||||
Tables:\n\
|
||||
**************\n\
|
||||
TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS, TYPE_CAT, TYPE_SCHEM, TYPE_NAME, SELF_REFERENCING_COL_NAME, REF_GENERATION\n\
|
||||
|
@ -600,6 +720,14 @@ async fn flightsql_jdbc() {
|
|||
public, iox, the_table, BASE TABLE, null, null, null, null, null, null\n\
|
||||
public, system, queries, BASE TABLE, null, null, null, null, null, null";
|
||||
|
||||
// CommandGetTables output
|
||||
let expected_tables_with_filters = "**************\n\
|
||||
Tables (system table filter):\n\
|
||||
**************\n\
|
||||
TABLE_CAT, TABLE_SCHEM, TABLE_NAME, TABLE_TYPE, REMARKS, TYPE_CAT, TYPE_SCHEM, TYPE_NAME, SELF_REFERENCING_COL_NAME, REF_GENERATION\n\
|
||||
------------\n\
|
||||
public, system, queries, BASE TABLE, null, null, null, null, null, null";
|
||||
|
||||
// CommandGetTableTypes output
|
||||
let expected_table_types = "**************\n\
|
||||
Table Types:\n\
|
||||
|
@ -617,7 +745,8 @@ async fn flightsql_jdbc() {
|
|||
.success()
|
||||
.stdout(predicate::str::contains(expected_catalogs))
|
||||
.stdout(predicate::str::contains(expected_schemas))
|
||||
.stdout(predicate::str::contains(expected_tables))
|
||||
.stdout(predicate::str::contains(expected_tables_no_filter))
|
||||
.stdout(predicate::str::contains(expected_tables_with_filters))
|
||||
.stdout(predicate::str::contains(expected_table_types));
|
||||
|
||||
let expected_metadata = EXPECTED_METADATA
|
||||
|
|
|
@ -138,10 +138,10 @@ public class Main {
|
|||
// null means no filtering
|
||||
print_result_set(md.getTables(null, null, null, null));
|
||||
|
||||
// System.out.println("**************");
|
||||
// System.out.println("Tables (system table filter):");
|
||||
// System.out.println("**************");
|
||||
// print_result_set(md.getTables("public", "system", null, null));
|
||||
System.out.println("**************");
|
||||
System.out.println("Tables (system table filter):");
|
||||
System.out.println("**************");
|
||||
print_result_set(md.getTables("public", "system", null, null));
|
||||
|
||||
System.out.println("**************");
|
||||
System.out.println("Table Types:");
|
||||
|
|
|
@ -183,16 +183,16 @@ impl FlightSqlClient {
|
|||
/// [`CommandGetTables`]: https://github.com/apache/arrow/blob/44edc27e549d82db930421b0d4c76098941afd71/format/FlightSql.proto#L1176-L1241
|
||||
pub async fn get_tables(
|
||||
&mut self,
|
||||
_catalog: Option<impl Into<String> + Send>,
|
||||
_db_schema_filter_pattern: Option<impl Into<String> + Send>,
|
||||
_table_name_filter_pattern: Option<impl Into<String> + Send>,
|
||||
_table_types: Option<Vec<String>>,
|
||||
catalog: Option<impl Into<String> + Send>,
|
||||
db_schema_filter_pattern: Option<impl Into<String> + Send>,
|
||||
table_name_filter_pattern: Option<impl Into<String> + Send>,
|
||||
table_types: Vec<String>,
|
||||
) -> Result<FlightRecordBatchStream> {
|
||||
let msg = CommandGetTables {
|
||||
catalog: None,
|
||||
db_schema_filter_pattern: None,
|
||||
table_name_filter_pattern: None,
|
||||
table_types: vec![],
|
||||
catalog: catalog.map(|s| s.into()),
|
||||
db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()),
|
||||
table_name_filter_pattern: table_name_filter_pattern.map(|s| s.into()),
|
||||
table_types,
|
||||
// TODO: implement include_schema after optional query parameters are done
|
||||
include_schema: false,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue