refactor: Cleanup rewriter

pull/24376/head
Stuart Carnie 2023-05-08 11:10:44 +10:00
parent e4b6a8f77a
commit 74b5c32440
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
1 changed files with 58 additions and 59 deletions

View File

@ -67,22 +67,21 @@ fn has_multiple_measurements(s: &Select) -> bool {
pub(super) fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result<Select> {
check_features(stmt)?;
let mut sel = Select {
fields: vec![],
from: vec![],
let from = from_expand_wildcards(s, stmt)?;
let (mut fields, group_by) = field_list_expand_wildcards(s, stmt, &from)?;
field_list_rewrite_aliases(&mut fields)?;
Ok(Select {
fields,
from,
condition: stmt.condition.clone(),
group_by: stmt.group_by.clone(),
group_by,
fill: stmt.fill,
order_by: stmt.order_by,
limit: stmt.limit,
offset: stmt.offset,
timezone: stmt.timezone.map(|v| *v),
};
from_expand_wildcards(s, stmt, &mut sel)?;
field_list_expand_wildcards(s, stmt, &mut sel)?;
field_list_rewrite_aliases(&mut sel.fields)?;
Ok(sel)
})
}
/// Asserts that the `SELECT` statement does not use any unimplemented features.
@ -155,8 +154,7 @@ fn field_list_normalize_time(stmt: &mut Select) {
fn from_expand_wildcards(
s: &dyn SchemaProvider,
stmt: &SelectStatement,
sel: &mut Select,
) -> Result<()> {
) -> Result<Vec<DataSource>> {
let mut new_from = Vec::new();
for ms in stmt.from.iter() {
match ms {
@ -185,8 +183,7 @@ fn from_expand_wildcards(
}
}
}
sel.from = new_from;
Ok(())
Ok(new_from)
}
/// Recursively drop any measurements of the `from` clause of `stmt` that do not project
@ -353,11 +350,11 @@ fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) {
fn field_list_expand_wildcards(
s: &dyn SchemaProvider,
stmt: &SelectStatement,
sel: &mut Select,
) -> Result<()> {
sel.fields = stmt.fields.iter().cloned().collect::<Vec<_>>();
from: &[DataSource],
) -> Result<(Vec<Field>, Option<GroupByClause>)> {
let mut fields = stmt.fields.iter().cloned().collect::<Vec<_>>();
// Rewrite all `DISTINCT <identifier>` expressions to `DISTINCT(<var ref>)`
if let ControlFlow::Break(e) = sel.fields.iter_mut().try_for_each(|f| {
if let ControlFlow::Break(e) = fields.iter_mut().try_for_each(|f| {
walk_expr_mut::<DataFusionError>(&mut f.expr, &mut |e| {
if let Expr::Distinct(ident) = e {
*e = Expr::Call(Call {
@ -376,8 +373,8 @@ fn field_list_expand_wildcards(
// Attempt to rewrite all variable references in the fields with their types, if one
// hasn't been specified.
if let ControlFlow::Break(e) = sel.fields.iter_mut().try_for_each(|f| {
let tv = TypeEvaluator::new(s, &sel.from);
if let ControlFlow::Break(e) = fields.iter_mut().try_for_each(|f| {
let tv = TypeEvaluator::new(s, from);
walk_expr_mut::<DataFusionError>(&mut f.expr, &mut |e| {
if let Expr::VarRef(ref mut v) = e {
@ -394,10 +391,10 @@ fn field_list_expand_wildcards(
let (has_field_wildcard, has_group_by_wildcard) = has_wildcards(stmt);
if (has_field_wildcard, has_group_by_wildcard) == (false, false) {
return Ok(());
return Ok((fields, stmt.group_by.clone()));
}
let (field_set, mut tag_set) = from_field_and_dimensions(s, &sel.from)?;
let (field_set, mut tag_set) = from_field_and_dimensions(s, from)?;
if !has_group_by_wildcard {
if let Some(group_by) = &stmt.group_by {
@ -411,31 +408,31 @@ fn field_list_expand_wildcards(
}
}
let fields = if !field_set.is_empty() {
let fields_iter = field_set.iter().map(|(k, v)| VarRef {
name: k.clone().into(),
data_type: Some(*v),
});
let fields = if has_field_wildcard {
let var_refs = if !field_set.is_empty() {
let fields_iter = field_set.iter().map(|(k, v)| VarRef {
name: k.clone().into(),
data_type: Some(*v),
});
if !has_group_by_wildcard {
fields_iter
.chain(tag_set.iter().map(|tag| VarRef {
name: tag.clone().into(),
data_type: Some(VarRefDataType::Tag),
}))
.sorted()
.collect::<Vec<_>>()
if !has_group_by_wildcard {
fields_iter
.chain(tag_set.iter().map(|tag| VarRef {
name: tag.clone().into(),
data_type: Some(VarRefDataType::Tag),
}))
.sorted()
.collect::<Vec<_>>()
} else {
fields_iter.sorted().collect::<Vec<_>>()
}
} else {
fields_iter.sorted().collect::<Vec<_>>()
}
} else {
vec![]
};
vec![]
};
if has_field_wildcard {
let mut new_fields = Vec::new();
for f in &sel.fields {
for f in &fields {
let add_field = |f: &VarRef| {
new_fields.push(Field {
expr: Expr::VarRef(f.clone()),
@ -455,12 +452,12 @@ fn field_list_expand_wildcards(
}
};
fields.iter().filter(filter).for_each(add_field);
var_refs.iter().filter(filter).for_each(add_field);
}
Expr::Literal(Literal::Regex(re)) => {
let re = util::parse_regex(re)?;
fields
var_refs
.iter()
.filter(|v| re.is_match(v.name.as_str()))
.for_each(add_field);
@ -523,14 +520,14 @@ fn field_list_expand_wildcards(
));
}
Some(Expr::Wildcard(_)) => {
fields
var_refs
.iter()
.filter(|v| supported_types.contains(&v.data_type))
.for_each(add_field);
}
Some(Expr::Literal(Literal::Regex(re))) => {
let re = util::parse_regex(re)?;
fields
var_refs
.iter()
.filter(|v| {
supported_types.contains(&v.data_type)
@ -570,17 +567,15 @@ fn field_list_expand_wildcards(
}
}
sel.fields = new_fields;
}
new_fields
} else {
fields
};
if has_group_by_wildcard {
let group_by_tags = if has_group_by_wildcard {
tag_set.into_iter().sorted().collect::<Vec<_>>()
} else {
vec![]
};
if let Some(group_by) = &stmt.group_by {
let group_by = match (&stmt.group_by, has_group_by_wildcard) {
// GROUP BY with a wildcard
(Some(group_by), true) => {
let group_by_tags = tag_set.into_iter().sorted().collect::<Vec<_>>();
let mut new_dimensions = Vec::new();
for dim in group_by.iter() {
@ -603,11 +598,15 @@ fn field_list_expand_wildcards(
_ => new_dimensions.push(dim.clone()),
}
}
sel.group_by = Some(GroupByClause::new(new_dimensions));
Some(GroupByClause::new(new_dimensions))
}
}
// GROUP BY no wildcard
(Some(group_by), false) => Some(group_by.clone()),
// No GROUP BY
(None, _) => None,
};
Ok(())
Ok((fields, group_by))
}
/// Resolve the projection list column names in accordance with the