refactor: Split up rpc_predicate module a bit (#4763)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-06-01 15:56:11 -04:00 committed by GitHub
parent 7328cc6a9a
commit a37c553545
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 119 additions and 97 deletions

View File

@ -1,20 +1,23 @@
//! Interface logic between IOx ['Predicate`] and the predicates used
//! by the InfluxDB Storage gRPC API
mod field_rewrite;
mod measurement_rewrite;
mod value_rewrite;
use crate::{rewrite, Predicate, ValueExpr};
use crate::{rewrite, Predicate};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::{
lit, Column, Expr, ExprRewritable, ExprRewriter, ExprSchema, ExprSchemable, ExprSimplifiable,
SimplifyInfo,
Column, Expr, ExprSchema, ExprSchemable, ExprSimplifiable, SimplifyInfo,
};
use schema::Schema;
use std::collections::BTreeSet;
use std::sync::Arc;
use self::field_rewrite::{FieldProjection, FieldProjectionRewriter};
use self::measurement_rewrite::rewrite_measurement_references;
use self::value_rewrite::rewrite_field_value_references;
/// Any column references to this name are rewritten to be
/// the actual table name by the Influx gRPC planner.
@ -283,108 +286,15 @@ impl<'a> ExprSchema for SimplifyAdapter<'a> {
}
}
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
/// with the actual table name
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> DataFusionResult<Expr> {
let mut rewriter = MeasurementRewriter { table_name };
expr.rewrite(&mut rewriter)
}
struct MeasurementRewriter<'a> {
table_name: &'a str,
}
impl ExprRewriter for MeasurementRewriter<'_> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
// rewrite col("_measurement") --> "table_name"
Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => {
// should not have a qualified foo._measurement
// reference
assert!(relation.is_none());
lit(self.table_name)
}
// no rewrite needed
_ => expr,
})
}
}
/// Rewrites an expression on `_value` as a boolean true literal, pushing any
/// encountered expressions onto `value_exprs` so they can be moved onto column
/// projections.
fn rewrite_field_value_references(
value_exprs: &mut Vec<ValueExpr>,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = FieldValueRewriter { value_exprs };
expr.rewrite(&mut rewriter)
}
struct FieldValueRewriter<'a> {
value_exprs: &'a mut Vec<ValueExpr>,
}
impl<'a> ExprRewriter for FieldValueRewriter<'a> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
// try and convert Expr into a ValueExpr
match expr.try_into() {
// found a value expr. Save and replace with true
Ok(value_expr) => {
self.value_exprs.push(value_expr);
Ok(lit(true))
}
// not a ValueExpr, so leave the same
Err(expr) => Ok(expr),
}
}
}
#[cfg(test)]
mod tests {
use crate::PredicateBuilder;
use super::*;
use arrow::datatypes::DataType;
use datafusion::logical_plan::col;
use datafusion::logical_plan::{col, lit};
use test_helpers::assert_contains;
#[test]
fn test_field_value_rewriter() {
let mut rewriter = FieldValueRewriter {
value_exprs: &mut vec![],
};
let cases = vec![
(col("f1").eq(lit(1.82)), col("f1").eq(lit(1.82)), vec![]),
(col("t2"), col("t2"), vec![]),
(
col(VALUE_COLUMN_NAME).eq(lit(1.82)),
// _value = 1.82 -> true
lit(true),
vec![ValueExpr {
expr: col(VALUE_COLUMN_NAME).eq(lit(1.82)),
}],
),
];
for (input, exp, mut value_exprs) in cases {
let rewritten = input.rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, exp);
assert_eq!(rewriter.value_exprs, &mut value_exprs);
}
// Test case with single field.
let mut rewriter = FieldValueRewriter {
value_exprs: &mut vec![],
};
let input = col(VALUE_COLUMN_NAME).gt(lit(1.88));
let rewritten = input.clone().rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, lit(true));
assert_eq!(rewriter.value_exprs, &mut vec![ValueExpr { expr: input }]);
}
#[test]
fn test_normalize_predicate_field_rewrite() {
let predicate = normalize_predicate(

View File

@ -0,0 +1,34 @@
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_plan::{lit, Column, Expr, ExprRewritable, ExprRewriter};
use super::MEASUREMENT_COLUMN_NAME;
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
/// with the actual table name
pub(crate) fn rewrite_measurement_references(
table_name: &str,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = MeasurementRewriter { table_name };
expr.rewrite(&mut rewriter)
}
struct MeasurementRewriter<'a> {
table_name: &'a str,
}
impl ExprRewriter for MeasurementRewriter<'_> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
// rewrite col("_measurement") --> "table_name"
Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => {
// should not have a qualified foo._measurement
// reference
assert!(relation.is_none());
lit(self.table_name)
}
// no rewrite needed
_ => expr,
})
}
}

View File

@ -0,0 +1,78 @@
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_plan::{lit, Expr, ExprRewritable, ExprRewriter};
use crate::ValueExpr;
/// Rewrites an expression on `_value` as a boolean true literal, pushing any
/// encountered expressions onto `value_exprs` so they can be moved onto column
/// projections.
pub(crate) fn rewrite_field_value_references(
value_exprs: &mut Vec<ValueExpr>,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = FieldValueRewriter { value_exprs };
expr.rewrite(&mut rewriter)
}
struct FieldValueRewriter<'a> {
value_exprs: &'a mut Vec<ValueExpr>,
}
impl<'a> ExprRewriter for FieldValueRewriter<'a> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
// try and convert Expr into a ValueExpr
match expr.try_into() {
// found a value expr. Save and replace with true
Ok(value_expr) => {
self.value_exprs.push(value_expr);
Ok(lit(true))
}
// not a ValueExpr, so leave the same
Err(expr) => Ok(expr),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc_predicate::VALUE_COLUMN_NAME;
use datafusion::logical_plan::col;
#[test]
fn test_field_value_rewriter() {
let mut rewriter = FieldValueRewriter {
value_exprs: &mut vec![],
};
let cases = vec![
(col("f1").eq(lit(1.82)), col("f1").eq(lit(1.82)), vec![]),
(col("t2"), col("t2"), vec![]),
(
col(VALUE_COLUMN_NAME).eq(lit(1.82)),
// _value = 1.82 -> true
lit(true),
vec![ValueExpr {
expr: col(VALUE_COLUMN_NAME).eq(lit(1.82)),
}],
),
];
for (input, exp, mut value_exprs) in cases {
let rewritten = input.rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, exp);
assert_eq!(rewriter.value_exprs, &mut value_exprs);
}
// Test case with single field.
let mut rewriter = FieldValueRewriter {
value_exprs: &mut vec![],
};
let input = col(VALUE_COLUMN_NAME).gt(lit(1.88));
let rewritten = input.clone().rewrite(&mut rewriter).unwrap();
assert_eq!(rewritten, lit(true));
assert_eq!(rewriter.value_exprs, &mut vec![ValueExpr { expr: input }]);
}
}