diff --git a/iox_query/src/plan/influxql/expr_type_evaluator.rs b/iox_query/src/plan/influxql/expr_type_evaluator.rs index ff05735f45..f696940ae2 100644 --- a/iox_query/src/plan/influxql/expr_type_evaluator.rs +++ b/iox_query/src/plan/influxql/expr_type_evaluator.rs @@ -1,31 +1,32 @@ use crate::plan::influxql::field::field_by_name; -use crate::plan::influxql::field_mapper::FieldMapper; +use crate::plan::influxql::field_mapper::map_type; use datafusion::common::{DataFusionError, Result}; use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName}; use influxdb_influxql_parser::expression::{Expr, UnaryOperator, VarRefDataType}; use influxdb_influxql_parser::literal::Literal; use influxdb_influxql_parser::select::{Dimension, FromMeasurementClause, MeasurementSelection}; use itertools::Itertools; +use predicate::rpc_predicate::QueryNamespaceMeta; /// Evaluate the type of the specified expression. /// /// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4796-L4797). pub(crate) fn evaluate_type( + namespace: &dyn QueryNamespaceMeta, expr: &Expr, from: &FromMeasurementClause, - fm: &dyn FieldMapper, ) -> Result> { - TypeEvaluator::new(from, fm).eval_type(expr) + TypeEvaluator::new(from, namespace).eval_type(expr) } struct TypeEvaluator<'a> { - fm: &'a dyn FieldMapper, + namespace: &'a dyn QueryNamespaceMeta, from: &'a FromMeasurementClause, } impl<'a> TypeEvaluator<'a> { - fn new(from: &'a FromMeasurementClause, fm: &'a dyn FieldMapper) -> Self { - Self { from, fm } + fn new(from: &'a FromMeasurementClause, namespace: &'a dyn QueryNamespaceMeta) -> Self { + Self { from, namespace } } fn eval_type(&self, expr: &Expr) -> Result> { @@ -82,7 +83,7 @@ impl<'a> TypeEvaluator<'a> { MeasurementSelection::Name(QualifiedMeasurementName { name: MeasurementName::Name(ident), .. - }) => match (data_type, self.fm.map_type(ident.as_str(), name)?) { + }) => match (data_type, map_type(self.namespace, ident.as_str(), name)?) { (Some(existing), Some(res)) => { if res < existing { data_type = Some(res) @@ -96,7 +97,7 @@ impl<'a> TypeEvaluator<'a> { if let Some(field) = field_by_name(select, name) { match ( data_type, - evaluate_type(&field.expr, &select.from, self.fm)?, + evaluate_type(self.namespace, &field.expr, &select.from)?, ) { (Some(existing), Some(res)) => { if res < existing { @@ -154,123 +155,159 @@ impl<'a> TypeEvaluator<'a> { #[cfg(test)] mod test { use crate::plan::influxql::expr_type_evaluator::evaluate_type; - use crate::plan::influxql::field_mapper::{FieldMapper, SchemaFieldMapper}; - use crate::plan::influxql::test_utils::{parse_select, MockSchemaProvider}; + use crate::plan::influxql::test_utils::{parse_select, MockNamespace}; use assert_matches::assert_matches; use influxdb_influxql_parser::expression::VarRefDataType; #[test] fn test_evaluate_type() { - let fm = - &SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper; + let namespace = MockNamespace::default(); let stmt = parse_select("SELECT shared_field0 FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select("SELECT shared_tag0 FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Tag); // Unknown let stmt = parse_select("SELECT not_exists FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from).unwrap(); assert!(res.is_none()); let stmt = parse_select("SELECT shared_field0 FROM temp_02"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); let stmt = parse_select("SELECT shared_field0 FROM temp_02"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); // Same field across multiple measurements resolves to the highest precedence (float) let stmt = parse_select("SELECT shared_field0 FROM temp_01, temp_02"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); // Explicit cast of integer field to float let stmt = parse_select("SELECT SUM(field_i64::float) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); // data types for functions let stmt = parse_select("SELECT SUM(field_f64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select("SELECT SUM(field_i64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); let stmt = parse_select("SELECT MIN(field_f64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select("SELECT MAX(field_i64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); let stmt = parse_select("SELECT FIRST(field_str) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::String); let stmt = parse_select("SELECT LAST(field_str) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::String); let stmt = parse_select("SELECT MEAN(field_i64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); let stmt = parse_select("SELECT COUNT(field_str) FROM temp_01"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Integer); // subqueries let stmt = parse_select("SELECT inner FROM (SELECT field_f64 as inner FROM temp_01)"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select("SELECT inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)"); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Float); let stmt = parse_select( "SELECT shared_tag0, inner FROM (SELECT shared_tag0, field_f64 as inner FROM temp_01)", ); let field = stmt.fields.head().unwrap(); - let res = evaluate_type(&field.expr, &stmt.from, fm).unwrap().unwrap(); + let res = evaluate_type(&namespace, &field.expr, &stmt.from) + .unwrap() + .unwrap(); assert_matches!(res, VarRefDataType::Tag); } } diff --git a/iox_query/src/plan/influxql/field_mapper.rs b/iox_query/src/plan/influxql/field_mapper.rs index fbdebeb8db..6a40108ea2 100644 --- a/iox_query/src/plan/influxql/field_mapper.rs +++ b/iox_query/src/plan/influxql/field_mapper.rs @@ -1,96 +1,65 @@ #![allow(dead_code)] use crate::plan::influxql::var_ref::field_type_to_var_ref_data_type; -use datafusion::catalog::schema::SchemaProvider; -use datafusion::common::{DataFusionError, Result}; +use datafusion::common::Result; use influxdb_influxql_parser::expression::VarRefDataType; -use schema::{InfluxColumnType, Schema}; +use predicate::rpc_predicate::QueryNamespaceMeta; +use schema::InfluxColumnType; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; pub(crate) type FieldTypeMap = HashMap; pub(crate) type TagSet = HashSet; -/// Represents an InfluxQL schema for determining the fields and tags -/// of a measurement. -pub(crate) trait FieldMapper { - /// Determine the fields and tags for the given measurement name. - fn field_and_dimensions(&self, name: &str) -> Result>; - /// Determine the [`VarRefDataType`] for the given field name. - fn map_type(&self, name: &str, field: &str) -> Result>; -} - -pub(crate) struct SchemaFieldMapper { - schema: Arc, -} - -impl SchemaFieldMapper { - pub(crate) fn new(schema: Arc) -> Self { - Self { schema } - } - - fn get_schema(&self, name: &str) -> Result> { - Ok(Some( - Schema::try_from(match self.schema.table(name) { - Some(t) => t.schema(), - None => return Ok(None), - }) - .map_err(|e| { - DataFusionError::Internal(format!("Unable to create IOx schema: {}", e)) - })?, - )) +pub(crate) fn field_and_dimensions( + namespace: &dyn QueryNamespaceMeta, + name: &str, +) -> Result> { + match namespace.table_schema(name) { + Some(iox) => Ok(Some(( + FieldTypeMap::from_iter(iox.iter().filter_map(|(col_type, f)| match col_type { + InfluxColumnType::Field(ft) => { + Some((f.name().clone(), field_type_to_var_ref_data_type(ft))) + } + _ => None, + })), + iox.tags_iter() + .map(|f| f.name().clone()) + .collect::(), + ))), + None => Ok(None), } } -impl FieldMapper for SchemaFieldMapper { - fn field_and_dimensions(&self, name: &str) -> Result> { - match self.get_schema(name)? { - Some(iox) => Ok(Some(( - FieldTypeMap::from_iter(iox.iter().filter_map(|(col_type, f)| match col_type { - InfluxColumnType::Field(ft) => { - Some((f.name().clone(), field_type_to_var_ref_data_type(ft))) - } - _ => None, - })), - iox.tags_iter() - .map(|f| f.name().clone()) - .collect::(), - ))), - None => Ok(None), - } - } - - fn map_type(&self, measurement_name: &str, field: &str) -> Result> { - match self.get_schema(measurement_name)? { - Some(iox) => Ok(match iox.find_index_of(field) { - Some(i) => match iox.field(i).0 { - InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)), - InfluxColumnType::Tag => Some(VarRefDataType::Tag), - InfluxColumnType::Timestamp => None, - }, - None => None, - }), - None => Ok(None), - } +pub(crate) fn map_type( + namespace: &dyn QueryNamespaceMeta, + measurement_name: &str, + field: &str, +) -> Result> { + match namespace.table_schema(measurement_name) { + Some(iox) => Ok(match iox.find_index_of(field) { + Some(i) => match iox.field(i).0 { + InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)), + InfluxColumnType::Tag => Some(VarRefDataType::Tag), + InfluxColumnType::Timestamp => None, + }, + None => None, + }), + None => Ok(None), } } #[cfg(test)] mod test { - use crate::plan::influxql::field_mapper::{ - FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet, - }; - use crate::plan::influxql::test_utils::MockSchemaProvider; + use super::*; + use crate::plan::influxql::test_utils::MockNamespace; use assert_matches::assert_matches; - use influxdb_influxql_parser::expression::VarRefDataType; #[test] fn test_schema_field_mapper() { - let fm = - &SchemaFieldMapper::new(MockSchemaProvider::new_schema_provider()) as &dyn FieldMapper; + let namespace = MockNamespace::default(); // Measurement exists - let (field_set, tag_set) = fm.field_and_dimensions("cpu").unwrap().unwrap(); + let (field_set, tag_set) = field_and_dimensions(&namespace, "cpu").unwrap().unwrap(); assert_eq!( field_set, FieldTypeMap::from([ @@ -105,22 +74,26 @@ mod test { ); // Measurement does not exist - assert!(fm.field_and_dimensions("cpu2").unwrap().is_none()); + assert!(field_and_dimensions(&namespace, "cpu2").unwrap().is_none()); // `map_type` API calls // Returns expected type assert_matches!( - fm.map_type("cpu", "usage_user").unwrap(), + map_type(&namespace, "cpu", "usage_user").unwrap(), Some(VarRefDataType::Float) ); assert_matches!( - fm.map_type("cpu", "host").unwrap(), + map_type(&namespace, "cpu", "host").unwrap(), Some(VarRefDataType::Tag) ); // Returns None for nonexistent field - assert!(fm.map_type("cpu", "nonexistent").unwrap().is_none()); + assert!(map_type(&namespace, "cpu", "nonexistent") + .unwrap() + .is_none()); // Returns None for nonexistent measurement - assert!(fm.map_type("nonexistent", "usage").unwrap().is_none()); + assert!(map_type(&namespace, "nonexistent", "usage") + .unwrap() + .is_none()); } } diff --git a/iox_query/src/plan/influxql/rewriter.rs b/iox_query/src/plan/influxql/rewriter.rs index 55d48e655c..3a42e930ef 100644 --- a/iox_query/src/plan/influxql/rewriter.rs +++ b/iox_query/src/plan/influxql/rewriter.rs @@ -2,8 +2,7 @@ use crate::plan::influxql::expr_type_evaluator::evaluate_type; use crate::plan::influxql::field::field_name; -use crate::plan::influxql::field_mapper::{FieldMapper, FieldTypeMap, SchemaFieldMapper, TagSet}; -use datafusion::catalog::schema::SchemaProvider; +use crate::plan::influxql::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet}; use datafusion::common::{DataFusionError, Result}; use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName}; use influxdb_influxql_parser::expression::{Expr, VarRefDataType, WildcardType}; @@ -16,11 +15,11 @@ use influxdb_influxql_parser::select::{ use influxdb_influxql_parser::string::Regex; use influxdb_influxql_parser::visit::{Recursion, Visitable, Visitor, VisitorResult}; use itertools::Itertools; +use predicate::rpc_predicate::QueryNamespaceMeta; use query_functions::clean_non_meta_escapes; use std::borrow::Borrow; use std::collections::{HashMap, HashSet}; use std::ops::Deref; -use std::sync::Arc; fn parse_regex(re: &Regex) -> Result { let pattern = clean_non_meta_escapes(re.as_str()); @@ -30,9 +29,8 @@ fn parse_regex(re: &Regex) -> Result { } /// Recursively expand the `from` clause of `stmt` and any subqueries. -fn rewrite_from(stmt: &mut SelectStatement, schema: Arc) -> Result<()> { +fn rewrite_from(namespace: &dyn QueryNamespaceMeta, stmt: &mut SelectStatement) -> Result<()> { let mut new_from = Vec::new(); - let schema = &Arc::clone(&schema); for ms in stmt.from.iter() { match ms { MeasurementSelection::Name(qmn) => match qmn { @@ -40,7 +38,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc) -> name: MeasurementName::Name(name), .. } => { - if schema.table(name).is_some() { + if namespace.table_schema(name).is_some() { new_from.push(ms.clone()) } } @@ -49,7 +47,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc) -> .. } => { let re = parse_regex(re)?; - schema + namespace .table_names() .into_iter() .filter(|table| re.is_match(table.as_str())) @@ -64,7 +62,7 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc) -> }, MeasurementSelection::Subquery(q) => { let mut q = *q.clone(); - rewrite_from(&mut q, Arc::clone(schema))?; + rewrite_from(namespace, &mut q)?; new_from.push(MeasurementSelection::Subquery(Box::new(q))) } } @@ -75,12 +73,11 @@ fn rewrite_from(stmt: &mut SelectStatement, schema: Arc) -> /// Determine the merged fields and tags of the `FROM` clause. fn from_field_and_dimensions( + namespace: &dyn QueryNamespaceMeta, from: &FromMeasurementClause, - schema: Arc, ) -> Result<(FieldTypeMap, TagSet)> { let mut fs = FieldTypeMap::new(); let mut ts = TagSet::new(); - let fm = &SchemaFieldMapper::new(schema) as &dyn FieldMapper; for ms in from.deref() { match ms { @@ -88,7 +85,7 @@ fn from_field_and_dimensions( name: MeasurementName::Name(name), .. }) => { - let (field_set, tag_set) = match fm.field_and_dimensions(name.as_str())? { + let (field_set, tag_set) = match field_and_dimensions(namespace, name.as_str())? { Some(res) => res, None => continue, }; @@ -111,7 +108,7 @@ fn from_field_and_dimensions( } MeasurementSelection::Subquery(select) => { for f in select.fields.iter() { - let dt = match evaluate_type(&f.expr, &select.from, fm)? { + let dt = match evaluate_type(namespace, &f.expr, &select.from)? { Some(dt) => dt, None => continue, }; @@ -238,21 +235,23 @@ pub(crate) fn walk_expr(expr: &Expr, visit: &mut impl FnMut(&Expr) -> Result<()> /// underlying schema. /// /// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185). -fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc) -> Result<()> { +fn rewrite_field_list( + namespace: &dyn QueryNamespaceMeta, + stmt: &mut SelectStatement, +) -> Result<()> { // Iterate through the `FROM` clause and rewrite any subqueries first. for ms in stmt.from.iter_mut() { if let MeasurementSelection::Subquery(subquery) = ms { - rewrite_field_list(subquery, Arc::clone(&schema))?; + rewrite_field_list(namespace, subquery)?; } } // Attempt to rewrite all variable references in the fields with their types, if one // hasn't been specified. - let fm = &SchemaFieldMapper::new(Arc::clone(&schema)) as &dyn FieldMapper; stmt.fields.iter_mut().try_for_each(|f| { walk_expr_mut(&mut f.expr, &mut |e| { if matches!(e, Expr::VarRef { .. }) { - let new_type = evaluate_type(e.borrow(), &stmt.from, fm)?; + let new_type = evaluate_type(namespace, e.borrow(), &stmt.from)?; if let Expr::VarRef { data_type, .. } = e { *data_type = new_type; @@ -267,7 +266,7 @@ fn rewrite_field_list(stmt: &mut SelectStatement, schema: Arc { return Err(DataFusionError::External( format!("unable to use tag as wildcard in {}()", name).into(), - )) + )); } Some(Expr::Wildcard(_)) => { fields @@ -527,12 +526,12 @@ fn rewrite_field_list_aliases(field_list: &mut FieldList) -> Result<()> { /// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions /// found in the projection list, `FROM` clause or `GROUP BY` clause. pub(crate) fn rewrite_statement( + namespace: &dyn QueryNamespaceMeta, q: &SelectStatement, - schema: Arc, ) -> Result { let mut stmt = q.clone(); - rewrite_from(&mut stmt, Arc::clone(&schema))?; - rewrite_field_list(&mut stmt, schema)?; + rewrite_from(namespace, &mut stmt)?; + rewrite_field_list(namespace, &mut stmt)?; rewrite_field_list_aliases(&mut stmt.fields)?; Ok(stmt) @@ -541,7 +540,7 @@ pub(crate) fn rewrite_statement( #[cfg(test)] mod test { use crate::plan::influxql::rewriter::{has_wildcards, rewrite_statement, walk_expr_mut}; - use crate::plan::influxql::test_utils::{get_first_field, MockSchemaProvider}; + use crate::plan::influxql::test_utils::{get_first_field, MockNamespace}; use influxdb_influxql_parser::expression::Expr; use influxdb_influxql_parser::literal::Literal; use influxdb_influxql_parser::parse_statements; @@ -559,9 +558,10 @@ mod test { #[test] fn test_rewrite_statement() { + let namespace = MockNamespace::default(); // Exact, match let stmt = parse_select("SELECT usage_user FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_user::float AS usage_user FROM cpu" @@ -569,7 +569,7 @@ mod test { // Duplicate columns do not have conflicting aliases let stmt = parse_select("SELECT usage_user, usage_user FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu" @@ -577,7 +577,7 @@ mod test { // Multiple aliases with no conflicts let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu" @@ -586,14 +586,14 @@ mod test { // Multiple aliases with conflicts let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu"); // Rewriting FROM clause // Regex, match let stmt = parse_select("SELECT bytes_free FROM /d/"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free::integer AS bytes_free FROM disk, diskio" @@ -601,26 +601,26 @@ mod test { // Exact, no match let stmt = parse_select("SELECT usage_idle FROM foo"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert!(stmt.from.is_empty()); // Regex, no match let stmt = parse_select("SELECT bytes_free FROM /^d$/"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert!(stmt.from.is_empty()); // Rewriting projection list // Single wildcard, single measurement let stmt = parse_select("SELECT * FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" ); let stmt = parse_select("SELECT * FROM cpu, disk"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk" @@ -628,7 +628,7 @@ mod test { // Regular expression selects fields from multiple measurements let stmt = parse_select("SELECT /usage|bytes/ FROM cpu, disk"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk" @@ -636,7 +636,7 @@ mod test { // Selective wildcard for tags let stmt = parse_select("SELECT *::tag FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT host::tag AS host, region::tag AS region FROM cpu" @@ -644,7 +644,7 @@ mod test { // Selective wildcard for fields let stmt = parse_select("SELECT *::field FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" @@ -652,7 +652,7 @@ mod test { // Mixed fields and wildcards let stmt = parse_select("SELECT usage_idle, *::tag FROM cpu"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle::float AS usage_idle, host::tag AS host, region::tag AS region FROM cpu" @@ -661,14 +661,14 @@ mod test { // GROUP BY expansion let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY host"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host" ); let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host, region" @@ -678,14 +678,14 @@ mod test { // Invalid regex let stmt = parse_select("SELECT usage_idle FROM /(not/"); - let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err(); + let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_contains!(err.to_string(), "invalid regular expression"); // Subqueries // Subquery, exact, match let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM cpu)"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle::float AS usage_idle FROM (SELECT usage_idle::float FROM cpu)" @@ -693,7 +693,7 @@ mod test { // Subquery, regex, match let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /d/)"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free::integer AS bytes_free FROM (SELECT bytes_free::integer FROM disk, diskio)" @@ -701,7 +701,7 @@ mod test { // Subquery, exact, no match let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM foo)"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT usage_idle AS usage_idle FROM (SELECT usage_idle )" @@ -709,7 +709,7 @@ mod test { // Subquery, regex, no match let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /^d$/)"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free AS bytes_free FROM (SELECT bytes_free )" @@ -717,7 +717,7 @@ mod test { // Binary expression let stmt = parse_select("SELECT bytes_free+bytes_used FROM disk"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk" @@ -725,7 +725,7 @@ mod test { // Unary expressions let stmt = parse_select("SELECT -bytes_free FROM disk"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT -bytes_free::integer AS bytes_free FROM disk" @@ -734,7 +734,7 @@ mod test { // Call expressions let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT COUNT(field_i64::integer) AS COUNT FROM temp_01" @@ -742,14 +742,14 @@ mod test { // Duplicate aggregate columns let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT COUNT(field_i64::integer) AS COUNT, COUNT(field_i64::integer) AS COUNT_1 FROM temp_01" ); let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT COUNT(field_f64::float) AS COUNT FROM temp_01" @@ -757,7 +757,7 @@ mod test { // Expands all fields let stmt = parse_select("SELECT COUNT(*) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64, COUNT(field_str::string) AS COUNT_field_str, COUNT(shared_field0::float) AS COUNT_shared_field0 FROM temp_01" @@ -765,7 +765,7 @@ mod test { // Expands matching fields let stmt = parse_select("SELECT COUNT(/64$/) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT COUNT(field_f64::float) AS COUNT_field_f64, COUNT(field_i64::integer) AS COUNT_field_i64 FROM temp_01" @@ -773,7 +773,7 @@ mod test { // Expands only numeric fields let stmt = parse_select("SELECT SUM(*) FROM temp_01"); - let stmt = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap(); + let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!( stmt.to_string(), "SELECT SUM(field_f64::float) AS SUM_field_f64, SUM(field_i64::integer) AS SUM_field_i64, SUM(shared_field0::float) AS SUM_shared_field0 FROM temp_01" @@ -782,14 +782,14 @@ mod test { // Fallible cases let stmt = parse_select("SELECT *::field + *::tag FROM cpu"); - let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err(); + let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_eq!( err.to_string(), "External error: unsupported expression: contains a wildcard or regular expression" ); let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu"); - let err = rewrite_statement(&stmt, MockSchemaProvider::new_schema_provider()).unwrap_err(); + let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_eq!( err.to_string(), "External error: unable to use tag as wildcard in COUNT()" diff --git a/iox_query/src/plan/influxql/test_utils.rs b/iox_query/src/plan/influxql/test_utils.rs index 85e14575d2..d7ef50ac88 100644 --- a/iox_query/src/plan/influxql/test_utils.rs +++ b/iox_query/src/plan/influxql/test_utils.rs @@ -3,31 +3,13 @@ use crate::test::TestChunk; use crate::QueryChunkMeta; -use arrow::datatypes::SchemaRef; -use async_trait::async_trait; -use datafusion::catalog::schema::SchemaProvider; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::Expr; -use datafusion::physical_plan::ExecutionPlan; use influxdb_influxql_parser::parse_statements; use influxdb_influxql_parser::select::{Field, SelectStatement}; use influxdb_influxql_parser::statement::Statement; -use itertools::Itertools; -use std::any::Any; +use predicate::rpc_predicate::QueryNamespaceMeta; +use schema::Schema; use std::sync::Arc; -struct EmptyTable { - table_schema: SchemaRef, -} - -impl EmptyTable { - pub(crate) fn new(table_schema: SchemaRef) -> Self { - Self { table_schema } - } -} - /// Returns the first `Field` of the `SELECT` statement. pub(crate) fn get_first_field(s: &str) -> Field { parse_select(s).fields.head().unwrap().clone() @@ -42,42 +24,13 @@ pub(crate) fn parse_select(s: &str) -> SelectStatement { } } -#[async_trait] -impl TableProvider for EmptyTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.table_schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - _ctx: &SessionState, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - unimplemented!() - } +pub(crate) struct MockNamespace { + chunks: Vec, } -pub(crate) struct MockSchemaProvider {} - -impl MockSchemaProvider { - /// Convenience constructor to return a new instance of [`Self`] as a dynamic [`SchemaProvider`]. - pub(crate) fn new_schema_provider() -> Arc { - Arc::new(Self {}) - } - - /// Return the chunks that make up the test database. - pub(crate) fn table_chunks() -> Vec { - vec![ +impl Default for MockNamespace { + fn default() -> Self { + let chunks = vec![ TestChunk::new("cpu") .with_quiet() .with_tag_column("host") @@ -120,36 +73,21 @@ impl MockSchemaProvider { .with_tag_column("shared_tag0") .with_tag_column("shared_tag1") .with_string_field_column_with_stats("shared_field0", None, None), - ] + ]; + Self { chunks } } } -impl SchemaProvider for MockSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - +impl QueryNamespaceMeta for MockNamespace { fn table_names(&self) -> Vec { - Self::table_chunks() + self.chunks .iter() - .map(|c| c.table_name().into()) - .sorted() - .collect::>() + .map(|x| x.table_name().to_string()) + .collect() } - fn table(&self, name: &str) -> Option> { - let schema = Self::table_chunks() - .iter() - .find(|c| c.table_name() == name) - .map(|c| c.schema()); - - match schema { - Some(s) => Some(Arc::new(EmptyTable::new(Arc::clone(s.inner())))), - None => None, - } - } - - fn table_exist(&self, name: &str) -> bool { - self.table_names().contains(&name.to_string()) + fn table_schema(&self, table_name: &str) -> Option> { + let c = self.chunks.iter().find(|x| x.table_name() == table_name)?; + Some(c.schema()) } }