refactor: Remove predicate module from predicate crate (#3648)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
17fbeaaade
commit
2e30483f1f
|
@ -14,7 +14,7 @@ use job_registry::JobRegistry;
|
|||
use metric::{Attributes, DurationCounter, Metric, U64Counter};
|
||||
use observability_deps::tracing::debug;
|
||||
use parking_lot::Mutex;
|
||||
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
|
||||
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
|
||||
use query::{
|
||||
provider::{ChunkPruner, ProviderBuilder},
|
||||
pruning::{prune_chunks, PruningObserver},
|
||||
|
@ -398,7 +398,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::test_helpers::write_lp;
|
||||
use crate::utils::make_db;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::PredicateBuilder;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_filtered_chunks() {
|
||||
|
|
|
@ -15,7 +15,7 @@ use mutable_buffer::snapshot::ChunkSnapshot;
|
|||
use observability_deps::tracing::debug;
|
||||
use parquet_file::chunk::ParquetChunk;
|
||||
use partition_metadata::TableSummary;
|
||||
use predicate::predicate::{Predicate, PredicateMatch};
|
||||
use predicate::{Predicate, PredicateMatch};
|
||||
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
|
||||
use read_buffer::RBChunk;
|
||||
use schema::InfluxColumnType;
|
||||
|
|
|
@ -42,7 +42,7 @@ use parquet_catalog::{
|
|||
prune::prune_history as prune_catalog_transaction_history,
|
||||
};
|
||||
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
|
||||
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
|
||||
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
|
||||
use query::{
|
||||
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
|
||||
QueryCompletedToken, QueryDatabase,
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
use std::convert::TryFrom;
|
||||
|
||||
use predicate::predicate::Predicate;
|
||||
use predicate::Predicate;
|
||||
use snafu::Snafu;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -55,7 +55,7 @@ pub mod test {
|
|||
use datafusion::logical_plan::{col, lit, Expr};
|
||||
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::PredicateBuilder;
|
||||
use read_buffer::BinaryExpr as RBBinaryExpr;
|
||||
use read_buffer::Predicate as RBPredicate;
|
||||
|
||||
|
|
|
@ -26,9 +26,9 @@ use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
|
|||
use observability_deps::tracing::warn;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::{
|
||||
predicate::PredicateBuilder,
|
||||
regex::regex_match_expr,
|
||||
rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME},
|
||||
PredicateBuilder,
|
||||
};
|
||||
use query::group_by::{Aggregate as QueryAggregate, WindowDuration};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
@ -867,7 +867,7 @@ fn format_comparison(v: i32, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use generated_types::node::Type as RPCNodeType;
|
||||
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
|
||||
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
|
||||
use std::{collections::BTreeSet, sync::Arc};
|
||||
|
||||
use super::*;
|
||||
|
|
|
@ -1351,7 +1351,7 @@ mod tests {
|
|||
Client as StorageClient, OrgAndBucket,
|
||||
};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use predicate::predicate::{PredicateBuilder, PredicateMatch};
|
||||
use predicate::{PredicateBuilder, PredicateMatch};
|
||||
use query::{
|
||||
exec::Executor,
|
||||
test::{TestChunk, TestDatabase, TestError},
|
||||
|
@ -2971,7 +2971,7 @@ mod tests {
|
|||
db_name: &str,
|
||||
partition_key: &str,
|
||||
chunk_id: u128,
|
||||
expected_predicate: &predicate::predicate::Predicate,
|
||||
expected_predicate: &predicate::Predicate,
|
||||
) {
|
||||
let actual_predicates = self
|
||||
.test_storage
|
||||
|
|
|
@ -15,10 +15,7 @@ use datafusion::physical_plan::{
|
|||
SendableRecordBatchStream,
|
||||
};
|
||||
use iox_catalog::interface::{SequenceNumber, Tombstone};
|
||||
use predicate::{
|
||||
delete_predicate::parse_delete_predicate,
|
||||
predicate::{Predicate, PredicateMatch},
|
||||
};
|
||||
use predicate::{delete_predicate::parse_delete_predicate, Predicate, PredicateMatch};
|
||||
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
|
||||
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
|
|
@ -5,7 +5,7 @@ use data_types::{
|
|||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use iox_object_store::{IoxObjectStore, ParquetFilePath};
|
||||
use predicate::predicate::Predicate;
|
||||
use predicate::Predicate;
|
||||
use schema::selection::Selection;
|
||||
use schema::{Schema, TIME_COLUMN_NAME};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
|
|
@ -22,7 +22,7 @@ use parquet::{
|
|||
basic::Compression,
|
||||
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
|
||||
};
|
||||
use predicate::predicate::Predicate;
|
||||
use predicate::Predicate;
|
||||
use schema::selection::Selection;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{
|
||||
|
|
|
@ -68,7 +68,7 @@ pub enum Error {
|
|||
/// Result type for Parser Cient
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl From<DeletePredicate> for crate::predicate::Predicate {
|
||||
impl From<DeletePredicate> for crate::Predicate {
|
||||
fn from(pred: DeletePredicate) -> Self {
|
||||
Self {
|
||||
field_columns: None,
|
||||
|
|
|
@ -10,7 +10,648 @@
|
|||
|
||||
pub mod delete_expr;
|
||||
pub mod delete_predicate;
|
||||
pub mod predicate;
|
||||
pub mod regex;
|
||||
pub mod rewrite;
|
||||
pub mod rpc_predicate;
|
||||
|
||||
use data_types::timestamp::{TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
logical_plan::{col, lit_timestamp_nano, Column, Expr, Operator},
|
||||
optimizer::utils,
|
||||
};
|
||||
use datafusion_util::{make_range_expr, AndExprBuilder};
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::TIME_COLUMN_NAME;
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
fmt,
|
||||
};
|
||||
|
||||
/// This `Predicate` represents the empty predicate (aka that evaluates to true for all rows).
|
||||
pub const EMPTY_PREDICATE: Predicate = Predicate {
|
||||
field_columns: None,
|
||||
exprs: vec![],
|
||||
range: None,
|
||||
partition_key: None,
|
||||
value_expr: vec![],
|
||||
};
|
||||
|
||||
/// A unified Predicate structure for IOx queries that can select and filter Fields and Tags from
|
||||
/// the InfluxDB data mode, as well as for arbitrary other predicates that are expressed by
|
||||
/// DataFusion's [`Expr`] type.
|
||||
///
|
||||
/// Note that the InfluxDB data model (e.g. ParsedLine's) distinguishes between some types of
|
||||
/// columns (tags and fields), and likewise the semantics of this structure can express some types
|
||||
/// of restrictions that only apply to certain types of columns.
|
||||
#[derive(Clone, Debug, Default, PartialEq, PartialOrd)]
|
||||
pub struct Predicate {
|
||||
/// Optional field restriction. If present, restricts the results to only
|
||||
/// tables which have *at least one* of the fields in field_columns.
|
||||
pub field_columns: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional partition key filter
|
||||
pub partition_key: Option<String>,
|
||||
|
||||
/// Optional timestamp range: only rows within this range are included in
|
||||
/// results. Other rows are excluded
|
||||
pub range: Option<TimestampRange>,
|
||||
|
||||
/// Optional arbitrary predicates, represented as list of
|
||||
/// DataFusion expressions applied a logical conjunction (aka they
|
||||
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
|
||||
/// these expressions should be returned. Other rows are excluded
|
||||
/// from the results.
|
||||
pub exprs: Vec<Expr>,
|
||||
|
||||
/// Optional arbitrary predicates on the special `_value` column. These
|
||||
/// expressions are applied to `field_columns` projections in the form of
|
||||
/// `CASE` statement conditions.
|
||||
pub value_expr: Vec<BinaryExpr>,
|
||||
}
|
||||
|
||||
impl Predicate {
|
||||
/// Return true if this predicate has any general purpose predicates
|
||||
pub fn has_exprs(&self) -> bool {
|
||||
!self.exprs.is_empty()
|
||||
}
|
||||
|
||||
/// Return a DataFusion `Expr` predicate representing the
|
||||
/// combination of all predicate (`exprs`) and timestamp
|
||||
/// restriction in this Predicate. Returns None if there are no
|
||||
/// `Expr`'s restricting the data
|
||||
pub fn filter_expr(&self) -> Option<Expr> {
|
||||
let mut builder =
|
||||
AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr());
|
||||
|
||||
for expr in &self.exprs {
|
||||
builder = builder.append_expr(expr.clone());
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/// Return true if the field should be included in results
|
||||
pub fn should_include_field(&self, field_name: &str) -> bool {
|
||||
match &self.field_columns {
|
||||
None => true, // No field restriction on predicate
|
||||
Some(field_names) => field_names.contains(field_name),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a DataFusion predicate for appliying a timestamp range:
|
||||
///
|
||||
/// `range.start <= time and time < range.end`
|
||||
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
|
||||
self.range
|
||||
.map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME))
|
||||
}
|
||||
|
||||
/// Returns true if ths predicate evaluates to true for all rows
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self == &EMPTY_PREDICATE
|
||||
}
|
||||
|
||||
/// Return a negated DF logical expression for the given delete predicates
|
||||
pub fn negated_expr<S>(delete_predicates: &[S]) -> Option<Expr>
|
||||
where
|
||||
S: AsRef<Self>,
|
||||
{
|
||||
if delete_predicates.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut pred = PredicateBuilder::default().build();
|
||||
pred.merge_delete_predicates(delete_predicates);
|
||||
|
||||
// Make a conjunctive expression of the pred.exprs
|
||||
let mut val = None;
|
||||
for e in pred.exprs {
|
||||
match val {
|
||||
None => val = Some(e),
|
||||
Some(expr) => val = Some(expr.and(e)),
|
||||
}
|
||||
}
|
||||
|
||||
val
|
||||
}
|
||||
|
||||
/// Merge the given delete predicates into this select predicate.
|
||||
/// Since we want to eliminate data filtered by the delete predicates,
|
||||
/// they are first converted into their negated form: NOT(delete_predicate)
|
||||
/// then added/merged into the selection one
|
||||
pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S])
|
||||
where
|
||||
S: AsRef<Self>,
|
||||
{
|
||||
// Create a list of disjunctive negated expressions.
|
||||
// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate
|
||||
// but we need to put it together with the exprs here)
|
||||
// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30)
|
||||
// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50)
|
||||
// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
|
||||
// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30]), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50]) which means
|
||||
// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30])], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50])]
|
||||
// Note that the "NOT(time_range in [20, 50])]" or "NOT(20 <= time <= 50)"" is replaced with "time < 20 OR time > 50"
|
||||
|
||||
for pred in delete_predicates {
|
||||
let pred = pred.as_ref();
|
||||
|
||||
let mut expr: Option<Expr> = None;
|
||||
|
||||
// Time range
|
||||
if let Some(range) = pred.range {
|
||||
// time_expr = NOT(start <= time_range <= end)
|
||||
// Equivalent to: (time < start OR time > end)
|
||||
let time_expr = col(TIME_COLUMN_NAME)
|
||||
.lt(lit_timestamp_nano(range.start()))
|
||||
.or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end())));
|
||||
|
||||
match expr {
|
||||
None => expr = Some(time_expr),
|
||||
Some(e) => expr = Some(e.or(time_expr)),
|
||||
}
|
||||
}
|
||||
|
||||
// Exprs
|
||||
for exp in &pred.exprs {
|
||||
match expr {
|
||||
None => expr = Some(exp.clone().not()),
|
||||
Some(e) => expr = Some(e.or(exp.clone().not())),
|
||||
}
|
||||
}
|
||||
|
||||
// Push the negated expression of the delete predicate into the list exprs of the select predicate
|
||||
if let Some(e) = expr {
|
||||
self.exprs.push(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the timestamp range from this predicate, if the range
|
||||
/// is for the entire min/max valid range.
|
||||
///
|
||||
/// This is used in certain cases to retain compatibility with the
|
||||
/// existing storage engine
|
||||
pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self {
|
||||
self.range = self.range.take().and_then(|range| {
|
||||
if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME {
|
||||
None
|
||||
} else {
|
||||
Some(range)
|
||||
}
|
||||
});
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Predicate {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fn iter_to_str<S>(s: impl IntoIterator<Item = S>) -> String
|
||||
where
|
||||
S: ToString,
|
||||
{
|
||||
s.into_iter()
|
||||
.map(|v| v.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
write!(f, "Predicate")?;
|
||||
|
||||
if let Some(field_columns) = &self.field_columns {
|
||||
write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?;
|
||||
}
|
||||
|
||||
if let Some(partition_key) = &self.partition_key {
|
||||
write!(f, " partition_key: '{}'", partition_key)?;
|
||||
}
|
||||
|
||||
if let Some(range) = &self.range {
|
||||
// TODO: could be nice to show this as actual timestamps (not just numbers)?
|
||||
write!(f, " range: [{} - {}]", range.start(), range.end())?;
|
||||
}
|
||||
|
||||
if !self.exprs.is_empty() {
|
||||
write!(f, " exprs: [")?;
|
||||
for (i, expr) in self.exprs.iter().enumerate() {
|
||||
write!(f, "{}", expr)?;
|
||||
if i < self.exprs.len() - 1 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
}
|
||||
write!(f, "]")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
/// The result of evaluating a predicate on a set of rows
|
||||
pub enum PredicateMatch {
|
||||
/// There is at least one row that matches the predicate that has
|
||||
/// at least one non null value in each field of the predicate
|
||||
AtLeastOneNonNullField,
|
||||
|
||||
/// There are exactly zero rows that match the predicate
|
||||
Zero,
|
||||
|
||||
/// There *may* be rows that match, OR there *may* be no rows that
|
||||
/// match
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Structure for building [`Predicate`]s
|
||||
///
|
||||
/// Example:
|
||||
/// ```
|
||||
/// use predicate::PredicateBuilder;
|
||||
/// use datafusion::logical_plan::{col, lit};
|
||||
///
|
||||
/// let p = PredicateBuilder::new()
|
||||
/// .timestamp_range(1, 100)
|
||||
/// .add_expr(col("foo").eq(lit(42)))
|
||||
/// .build();
|
||||
///
|
||||
/// assert_eq!(
|
||||
/// p.to_string(),
|
||||
/// "Predicate range: [1 - 100] exprs: [#foo = Int32(42)]"
|
||||
/// );
|
||||
/// ```
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PredicateBuilder {
|
||||
inner: Predicate,
|
||||
}
|
||||
|
||||
impl From<Predicate> for PredicateBuilder {
|
||||
fn from(inner: Predicate) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl PredicateBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Sets the timestamp range
|
||||
pub fn timestamp_range(mut self, start: i64, end: i64) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
|
||||
self.inner.range = Some(TimestampRange::new(start, end));
|
||||
self
|
||||
}
|
||||
|
||||
/// sets the optional timestamp range, if any
|
||||
pub fn timestamp_range_option(mut self, range: Option<TimestampRange>) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
range.is_none() || self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
self.inner.range = range;
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds an expression to the list of general purpose predicates
|
||||
pub fn add_expr(mut self, expr: Expr) -> Self {
|
||||
self.inner.exprs.push(expr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a regex matching expression from the provided column name and
|
||||
/// pattern. Values not matching the regex will be filtered out.
|
||||
pub fn build_regex_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
|
||||
self.regex_match_expr(column, pattern, true)
|
||||
}
|
||||
|
||||
/// Builds a regex "not matching" expression from the provided column name
|
||||
/// and pattern. Values *matching* the regex will be filtered out.
|
||||
pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
|
||||
self.regex_match_expr(column, pattern, false)
|
||||
}
|
||||
|
||||
fn regex_match_expr(mut self, column: &str, pattern: impl Into<String>, matches: bool) -> Self {
|
||||
let expr = crate::regex::regex_match_expr(col(column), pattern.into(), matches);
|
||||
self.inner.exprs.push(expr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets field_column restriction
|
||||
pub fn field_columns(mut self, columns: Vec<impl Into<String>>) -> Self {
|
||||
// We need to distinguish predicates like `column_name In
|
||||
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle
|
||||
// this
|
||||
if self.inner.field_columns.is_some() {
|
||||
unimplemented!("Complex/Multi field predicates are not yet supported");
|
||||
}
|
||||
|
||||
let column_names = columns
|
||||
.into_iter()
|
||||
.map(|s| s.into())
|
||||
.collect::<BTreeSet<_>>();
|
||||
|
||||
self.inner.field_columns = Some(column_names);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the partition key restriction
|
||||
pub fn partition_key(mut self, partition_key: impl Into<String>) -> Self {
|
||||
assert!(
|
||||
self.inner.partition_key.is_none(),
|
||||
"multiple partition key predicates not suported"
|
||||
);
|
||||
self.inner.partition_key = Some(partition_key.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a predicate, consuming this builder
|
||||
pub fn build(self) -> Predicate {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Adds only the expressions from `filters` that can be pushed down to
|
||||
/// execution engines.
|
||||
pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self {
|
||||
// For each expression of the filters, recursively split it, if it is is an AND conjunction
|
||||
// For example, expression (x AND y) will be split into a vector of 2 expressions [x, y]
|
||||
let mut exprs = vec![];
|
||||
filters
|
||||
.iter()
|
||||
.for_each(|expr| Self::split_members(expr, &mut exprs));
|
||||
|
||||
// Only keep single_column and primitive binary expressions
|
||||
let mut pushdown_exprs: Vec<Expr> = vec![];
|
||||
let exprs_result = exprs
|
||||
.into_iter()
|
||||
.try_for_each::<_, Result<_, DataFusionError>>(|expr| {
|
||||
let mut columns = HashSet::new();
|
||||
utils::expr_to_columns(&expr, &mut columns)?;
|
||||
|
||||
if columns.len() == 1 && Self::primitive_binary_expr(&expr) {
|
||||
pushdown_exprs.push(expr);
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match exprs_result {
|
||||
Ok(()) => {
|
||||
// Return the builder with only the pushdownable expressions on it.
|
||||
self.inner.exprs.append(&mut pushdown_exprs);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters);
|
||||
}
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Recursively split all "AND" expressions into smaller one
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
pub fn split_members(predicate: &Expr, predicates: &mut Vec<Expr>) {
|
||||
match predicate {
|
||||
Expr::BinaryExpr {
|
||||
right,
|
||||
op: Operator::And,
|
||||
left,
|
||||
} => {
|
||||
Self::split_members(left, predicates);
|
||||
Self::split_members(right, predicates);
|
||||
}
|
||||
other => predicates.push(other.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if the given expression is in a primitive binary in the form: `column op constant`
|
||||
// and op must be a comparison one
|
||||
pub fn primitive_binary_expr(expr: &Expr) -> bool {
|
||||
match expr {
|
||||
Expr::BinaryExpr { left, op, right } => {
|
||||
matches!(
|
||||
(&**left, &**right),
|
||||
(Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_))
|
||||
) && matches!(
|
||||
op,
|
||||
Operator::Eq
|
||||
| Operator::NotEq
|
||||
| Operator::Lt
|
||||
| Operator::LtEq
|
||||
| Operator::Gt
|
||||
| Operator::GtEq
|
||||
)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A representation of the `BinaryExpr` variant of a Datafusion expression.
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd)]
|
||||
pub struct BinaryExpr {
|
||||
pub left: Column,
|
||||
pub op: Operator,
|
||||
pub right: Expr,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
|
||||
#[test]
|
||||
fn test_default_predicate_is_empty() {
|
||||
let p = Predicate::default();
|
||||
assert!(p.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_non_default_predicate_is_not_empty() {
|
||||
let p = PredicateBuilder::new().timestamp_range(1, 100).build();
|
||||
|
||||
assert!(!p.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pushdown_predicates() {
|
||||
let mut filters = vec![];
|
||||
|
||||
// state = CA
|
||||
let expr1 = col("state").eq(lit("CA"));
|
||||
filters.push(expr1);
|
||||
|
||||
// "price > 10"
|
||||
let expr2 = col("price").gt(lit(10));
|
||||
filters.push(expr2);
|
||||
|
||||
// a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50]
|
||||
let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50)));
|
||||
filters.push(expr3);
|
||||
|
||||
// c != 3 OR d = 8 --> won't be pushed down
|
||||
let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8)));
|
||||
filters.push(expr4);
|
||||
|
||||
// e is null --> won't be pushed down
|
||||
let expr5 = col("e").is_null();
|
||||
filters.push(expr5);
|
||||
|
||||
// f <= 60
|
||||
let expr6 = col("f").lt_eq(lit(60));
|
||||
filters.push(expr6);
|
||||
|
||||
// g is not null --> won't be pushed down
|
||||
let expr7 = col("g").is_not_null();
|
||||
filters.push(expr7);
|
||||
|
||||
// h + i --> won't be pushed down
|
||||
let expr8 = col("h") + col("i");
|
||||
filters.push(expr8);
|
||||
|
||||
// city = Boston
|
||||
let expr9 = col("city").eq(lit("Boston"));
|
||||
filters.push(expr9);
|
||||
|
||||
// city != Braintree
|
||||
let expr9 = col("city").not_eq(lit("Braintree"));
|
||||
filters.push(expr9);
|
||||
|
||||
// city != state --> won't be pushed down
|
||||
let expr10 = col("city").not_eq(col("state"));
|
||||
filters.push(expr10);
|
||||
|
||||
// city = state --> won't be pushed down
|
||||
let expr11 = col("city").eq(col("state"));
|
||||
filters.push(expr11);
|
||||
|
||||
// city_state = city + state --> won't be pushed down
|
||||
let expr12 = col("city_sate").eq(col("city") + col("state"));
|
||||
filters.push(expr12);
|
||||
|
||||
// city = city + 5 --> won't be pushed down
|
||||
let expr13 = col("city").eq(col("city") + lit(5));
|
||||
filters.push(expr13);
|
||||
|
||||
// city = city --> won't be pushed down
|
||||
let expr14 = col("city").eq(col("city"));
|
||||
filters.push(expr14);
|
||||
|
||||
// city + 5 = city --> won't be pushed down
|
||||
let expr15 = (col("city") + lit(5)).eq(col("city"));
|
||||
filters.push(expr15);
|
||||
|
||||
// 5 = city
|
||||
let expr16 = lit(5).eq(col("city"));
|
||||
filters.push(expr16);
|
||||
|
||||
println!(" --------------- Filters: {:#?}", filters);
|
||||
|
||||
// Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city]
|
||||
let predicate = PredicateBuilder::default()
|
||||
.add_pushdown_exprs(&filters)
|
||||
.build();
|
||||
|
||||
println!(" ------------- Predicates: {:#?}", predicate);
|
||||
assert_eq!(predicate.exprs.len(), 8);
|
||||
assert_eq!(predicate.exprs[0], col("state").eq(lit("CA")));
|
||||
assert_eq!(predicate.exprs[1], col("price").gt(lit(10)));
|
||||
assert_eq!(predicate.exprs[2], col("a").lt(lit(10)));
|
||||
assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50)));
|
||||
assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60)));
|
||||
assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston")));
|
||||
assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree")));
|
||||
assert_eq!(predicate.exprs[7], lit(5).eq(col("city")));
|
||||
}
|
||||
#[test]
|
||||
fn predicate_display_ts() {
|
||||
// TODO make this a doc example?
|
||||
let p = PredicateBuilder::new().timestamp_range(1, 100).build();
|
||||
|
||||
assert_eq!(p.to_string(), "Predicate range: [1 - 100]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn predicate_display_ts_and_expr() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11))))
|
||||
.build();
|
||||
|
||||
assert_eq!(
|
||||
p.to_string(),
|
||||
"Predicate range: [1 - 100] exprs: [#foo = Int32(42) AND #bar < Int32(11)]"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn predicate_display_full() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.field_columns(vec!["f1", "f2"])
|
||||
.partition_key("the_key")
|
||||
.build();
|
||||
|
||||
assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range_low() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(MIN_NANO_TIME, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range_high() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(0, MAX_NANO_TIME)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_in_range() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = PredicateBuilder::new()
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
// rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,648 +0,0 @@
|
|||
//! This module contains a unified Predicate structure for IOx qieries
|
||||
//! that can select and filter Fields and Tags from the InfluxDB data
|
||||
//! mode as well as for arbitrary other predicates that are expressed
|
||||
//! by DataFusion's `Expr` type.
|
||||
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
fmt,
|
||||
};
|
||||
|
||||
use data_types::timestamp::{TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
logical_plan::{col, lit_timestamp_nano, Column, Expr, Operator},
|
||||
optimizer::utils,
|
||||
};
|
||||
use datafusion_util::{make_range_expr, AndExprBuilder};
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::TIME_COLUMN_NAME;
|
||||
|
||||
/// This `Predicate` represents the empty predicate (aka that
|
||||
/// evaluates to true for all rows).
|
||||
pub const EMPTY_PREDICATE: Predicate = Predicate {
|
||||
field_columns: None,
|
||||
exprs: vec![],
|
||||
range: None,
|
||||
partition_key: None,
|
||||
value_expr: vec![],
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
/// The result of evaluating a predicate on a set of rows
|
||||
pub enum PredicateMatch {
|
||||
/// There is at least one row that matches the predicate that has
|
||||
/// at least one non null value in each field of the predicate
|
||||
AtLeastOneNonNullField,
|
||||
|
||||
/// There are exactly zero rows that match the predicate
|
||||
Zero,
|
||||
|
||||
/// There *may* be rows that match, OR there *may* be no rows that
|
||||
/// match
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Represents a parsed predicate for evaluation by the InfluxDB IOx
|
||||
/// query engine.
|
||||
///
|
||||
/// Note that the InfluxDB data model (e.g. ParsedLine's)
|
||||
/// distinguishes between some types of columns (tags and fields), and
|
||||
/// likewise the semantics of this structure can express some types of
|
||||
/// restrictions that only apply to certain types of columns.
|
||||
#[derive(Clone, Debug, Default, PartialEq, PartialOrd)]
|
||||
pub struct Predicate {
|
||||
/// Optional field restriction. If present, restricts the results to only
|
||||
/// tables which have *at least one* of the fields in field_columns.
|
||||
pub field_columns: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional partition key filter
|
||||
pub partition_key: Option<String>,
|
||||
|
||||
/// Optional timestamp range: only rows within this range are included in
|
||||
/// results. Other rows are excluded
|
||||
pub range: Option<TimestampRange>,
|
||||
|
||||
/// Optional arbitrary predicates, represented as list of
|
||||
/// DataFusion expressions applied a logical conjunction (aka they
|
||||
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
|
||||
/// these expressions should be returned. Other rows are excluded
|
||||
/// from the results.
|
||||
pub exprs: Vec<Expr>,
|
||||
|
||||
/// Optional arbitrary predicates on the special `_value` column. These
|
||||
/// expressions are applied to `field_columns` projections in the form of
|
||||
/// `CASE` statement conditions.
|
||||
pub value_expr: Vec<BinaryExpr>,
|
||||
}
|
||||
|
||||
impl Predicate {
|
||||
/// Return true if this predicate has any general purpose predicates
|
||||
pub fn has_exprs(&self) -> bool {
|
||||
!self.exprs.is_empty()
|
||||
}
|
||||
|
||||
/// Return a DataFusion `Expr` predicate representing the
|
||||
/// combination of all predicate (`exprs`) and timestamp
|
||||
/// restriction in this Predicate. Returns None if there are no
|
||||
/// `Expr`'s restricting the data
|
||||
pub fn filter_expr(&self) -> Option<Expr> {
|
||||
let mut builder =
|
||||
AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr());
|
||||
|
||||
for expr in &self.exprs {
|
||||
builder = builder.append_expr(expr.clone());
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/// Return true if the field should be included in results
|
||||
pub fn should_include_field(&self, field_name: &str) -> bool {
|
||||
match &self.field_columns {
|
||||
None => true, // No field restriction on predicate
|
||||
Some(field_names) => field_names.contains(field_name),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a DataFusion predicate for appliying a timestamp range:
|
||||
///
|
||||
/// `range.start <= time and time < range.end`
|
||||
fn make_timestamp_predicate_expr(&self) -> Option<Expr> {
|
||||
self.range
|
||||
.map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME))
|
||||
}
|
||||
|
||||
/// Returns true if ths predicate evaluates to true for all rows
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self == &EMPTY_PREDICATE
|
||||
}
|
||||
|
||||
/// Return a negated DF logical expression for the given delete predicates
|
||||
pub fn negated_expr<S>(delete_predicates: &[S]) -> Option<Expr>
|
||||
where
|
||||
S: AsRef<Self>,
|
||||
{
|
||||
if delete_predicates.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut pred = PredicateBuilder::default().build();
|
||||
pred.merge_delete_predicates(delete_predicates);
|
||||
|
||||
// Make a conjunctive expression of the pred.exprs
|
||||
let mut val = None;
|
||||
for e in pred.exprs {
|
||||
match val {
|
||||
None => val = Some(e),
|
||||
Some(expr) => val = Some(expr.and(e)),
|
||||
}
|
||||
}
|
||||
|
||||
val
|
||||
}
|
||||
|
||||
/// Merge the given delete predicates into this select predicate.
|
||||
/// Since we want to eliminate data filtered by the delete predicates,
|
||||
/// they are first converted into their negated form: NOT(delete_predicate)
|
||||
/// then added/merged into the selection one
|
||||
pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S])
|
||||
where
|
||||
S: AsRef<Self>,
|
||||
{
|
||||
// Create a list of disjunctive negated expressions.
|
||||
// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate
|
||||
// but we need to put it together with the exprs here)
|
||||
// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30)
|
||||
// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50)
|
||||
// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
|
||||
// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30]), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50]) which means
|
||||
// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30])], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50])]
|
||||
// Note that the "NOT(time_range in [20, 50])]" or "NOT(20 <= time <= 50)"" is replaced with "time < 20 OR time > 50"
|
||||
|
||||
for pred in delete_predicates {
|
||||
let pred = pred.as_ref();
|
||||
|
||||
let mut expr: Option<Expr> = None;
|
||||
|
||||
// Time range
|
||||
if let Some(range) = pred.range {
|
||||
// time_expr = NOT(start <= time_range <= end)
|
||||
// Equivalent to: (time < start OR time > end)
|
||||
let time_expr = col(TIME_COLUMN_NAME)
|
||||
.lt(lit_timestamp_nano(range.start()))
|
||||
.or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end())));
|
||||
|
||||
match expr {
|
||||
None => expr = Some(time_expr),
|
||||
Some(e) => expr = Some(e.or(time_expr)),
|
||||
}
|
||||
}
|
||||
|
||||
// Exprs
|
||||
for exp in &pred.exprs {
|
||||
match expr {
|
||||
None => expr = Some(exp.clone().not()),
|
||||
Some(e) => expr = Some(e.or(exp.clone().not())),
|
||||
}
|
||||
}
|
||||
|
||||
// Push the negated expression of the delete predicate into the list exprs of the select predicate
|
||||
if let Some(e) = expr {
|
||||
self.exprs.push(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the timestamp range from this predicate, if the range
|
||||
/// is for the entire min/max valid range.
|
||||
///
|
||||
/// This is used in certain cases to retain compatibility with the
|
||||
/// existing storage engine
|
||||
pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self {
|
||||
self.range = self.range.take().and_then(|range| {
|
||||
if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME {
|
||||
None
|
||||
} else {
|
||||
Some(range)
|
||||
}
|
||||
});
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Predicate {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fn iter_to_str<S>(s: impl IntoIterator<Item = S>) -> String
|
||||
where
|
||||
S: ToString,
|
||||
{
|
||||
s.into_iter()
|
||||
.map(|v| v.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
}
|
||||
|
||||
write!(f, "Predicate")?;
|
||||
|
||||
if let Some(field_columns) = &self.field_columns {
|
||||
write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?;
|
||||
}
|
||||
|
||||
if let Some(partition_key) = &self.partition_key {
|
||||
write!(f, " partition_key: '{}'", partition_key)?;
|
||||
}
|
||||
|
||||
if let Some(range) = &self.range {
|
||||
// TODO: could be nice to show this as actual timestamps (not just numbers)?
|
||||
write!(f, " range: [{} - {}]", range.start(), range.end())?;
|
||||
}
|
||||
|
||||
if !self.exprs.is_empty() {
|
||||
write!(f, " exprs: [")?;
|
||||
for (i, expr) in self.exprs.iter().enumerate() {
|
||||
write!(f, "{}", expr)?;
|
||||
if i < self.exprs.len() - 1 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
}
|
||||
write!(f, "]")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// Structure for building [`Predicate`]s
|
||||
///
|
||||
/// Example:
|
||||
/// ```
|
||||
/// use predicate::predicate::PredicateBuilder;
|
||||
/// use datafusion::logical_plan::{col, lit};
|
||||
///
|
||||
/// let p = PredicateBuilder::new()
|
||||
/// .timestamp_range(1, 100)
|
||||
/// .add_expr(col("foo").eq(lit(42)))
|
||||
/// .build();
|
||||
///
|
||||
/// assert_eq!(
|
||||
/// p.to_string(),
|
||||
/// "Predicate range: [1 - 100] exprs: [#foo = Int32(42)]"
|
||||
/// );
|
||||
/// ```
|
||||
pub struct PredicateBuilder {
|
||||
inner: Predicate,
|
||||
}
|
||||
|
||||
impl From<Predicate> for PredicateBuilder {
|
||||
fn from(inner: Predicate) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl PredicateBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Sets the timestamp range
|
||||
pub fn timestamp_range(mut self, start: i64, end: i64) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
|
||||
self.inner.range = Some(TimestampRange::new(start, end));
|
||||
self
|
||||
}
|
||||
|
||||
/// sets the optional timestamp range, if any
|
||||
pub fn timestamp_range_option(mut self, range: Option<TimestampRange>) -> Self {
|
||||
// Without more thought, redefining the timestamp range would
|
||||
// lose the old range. Asser that that cannot happen.
|
||||
assert!(
|
||||
range.is_none() || self.inner.range.is_none(),
|
||||
"Unexpected re-definition of timestamp range"
|
||||
);
|
||||
self.inner.range = range;
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds an expression to the list of general purpose predicates
|
||||
pub fn add_expr(mut self, expr: Expr) -> Self {
|
||||
self.inner.exprs.push(expr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a regex matching expression from the provided column name and
|
||||
/// pattern. Values not matching the regex will be filtered out.
|
||||
pub fn build_regex_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
|
||||
self.regex_match_expr(column, pattern, true)
|
||||
}
|
||||
|
||||
/// Builds a regex "not matching" expression from the provided column name
|
||||
/// and pattern. Values *matching* the regex will be filtered out.
|
||||
pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
|
||||
self.regex_match_expr(column, pattern, false)
|
||||
}
|
||||
|
||||
fn regex_match_expr(mut self, column: &str, pattern: impl Into<String>, matches: bool) -> Self {
|
||||
let expr = crate::regex::regex_match_expr(col(column), pattern.into(), matches);
|
||||
self.inner.exprs.push(expr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets field_column restriction
|
||||
pub fn field_columns(mut self, columns: Vec<impl Into<String>>) -> Self {
|
||||
// We need to distinguish predicates like `column_name In
|
||||
// (foo, bar)` and `column_name = foo and column_name = bar` in order to handle
|
||||
// this
|
||||
if self.inner.field_columns.is_some() {
|
||||
unimplemented!("Complex/Multi field predicates are not yet supported");
|
||||
}
|
||||
|
||||
let column_names = columns
|
||||
.into_iter()
|
||||
.map(|s| s.into())
|
||||
.collect::<BTreeSet<_>>();
|
||||
|
||||
self.inner.field_columns = Some(column_names);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the partition key restriction
|
||||
pub fn partition_key(mut self, partition_key: impl Into<String>) -> Self {
|
||||
assert!(
|
||||
self.inner.partition_key.is_none(),
|
||||
"multiple partition key predicates not suported"
|
||||
);
|
||||
self.inner.partition_key = Some(partition_key.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Create a predicate, consuming this builder
|
||||
pub fn build(self) -> Predicate {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Adds only the expressions from `filters` that can be pushed down to
|
||||
/// execution engines.
|
||||
pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self {
|
||||
// For each expression of the filters, recursively split it, if it is is an AND conjunction
|
||||
// For example, expression (x AND y) will be split into a vector of 2 expressions [x, y]
|
||||
let mut exprs = vec![];
|
||||
filters
|
||||
.iter()
|
||||
.for_each(|expr| Self::split_members(expr, &mut exprs));
|
||||
|
||||
// Only keep single_column and primitive binary expressions
|
||||
let mut pushdown_exprs: Vec<Expr> = vec![];
|
||||
let exprs_result = exprs
|
||||
.into_iter()
|
||||
.try_for_each::<_, Result<_, DataFusionError>>(|expr| {
|
||||
let mut columns = HashSet::new();
|
||||
utils::expr_to_columns(&expr, &mut columns)?;
|
||||
|
||||
if columns.len() == 1 && Self::primitive_binary_expr(&expr) {
|
||||
pushdown_exprs.push(expr);
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match exprs_result {
|
||||
Ok(()) => {
|
||||
// Return the builder with only the pushdownable expressions on it.
|
||||
self.inner.exprs.append(&mut pushdown_exprs);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters);
|
||||
}
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Recursively split all "AND" expressions into smaller one
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
pub fn split_members(predicate: &Expr, predicates: &mut Vec<Expr>) {
|
||||
match predicate {
|
||||
Expr::BinaryExpr {
|
||||
right,
|
||||
op: Operator::And,
|
||||
left,
|
||||
} => {
|
||||
Self::split_members(left, predicates);
|
||||
Self::split_members(right, predicates);
|
||||
}
|
||||
other => predicates.push(other.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if the given expression is in a primitive binary in the form: `column op constant`
|
||||
// and op must be a comparison one
|
||||
pub fn primitive_binary_expr(expr: &Expr) -> bool {
|
||||
match expr {
|
||||
Expr::BinaryExpr { left, op, right } => {
|
||||
matches!(
|
||||
(&**left, &**right),
|
||||
(Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_))
|
||||
) && matches!(
|
||||
op,
|
||||
Operator::Eq
|
||||
| Operator::NotEq
|
||||
| Operator::Lt
|
||||
| Operator::LtEq
|
||||
| Operator::Gt
|
||||
| Operator::GtEq
|
||||
)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A representation of the `BinaryExpr` variant of a Datafusion expression.
|
||||
#[derive(Clone, Debug, PartialEq, PartialOrd)]
|
||||
pub struct BinaryExpr {
|
||||
pub left: Column,
|
||||
pub op: Operator,
|
||||
pub right: Expr,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
|
||||
#[test]
|
||||
fn test_default_predicate_is_empty() {
|
||||
let p = Predicate::default();
|
||||
assert!(p.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_non_default_predicate_is_not_empty() {
|
||||
let p = PredicateBuilder::new().timestamp_range(1, 100).build();
|
||||
|
||||
assert!(!p.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pushdown_predicates() {
|
||||
let mut filters = vec![];
|
||||
|
||||
// state = CA
|
||||
let expr1 = col("state").eq(lit("CA"));
|
||||
filters.push(expr1);
|
||||
|
||||
// "price > 10"
|
||||
let expr2 = col("price").gt(lit(10));
|
||||
filters.push(expr2);
|
||||
|
||||
// a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50]
|
||||
let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50)));
|
||||
filters.push(expr3);
|
||||
|
||||
// c != 3 OR d = 8 --> won't be pushed down
|
||||
let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8)));
|
||||
filters.push(expr4);
|
||||
|
||||
// e is null --> won't be pushed down
|
||||
let expr5 = col("e").is_null();
|
||||
filters.push(expr5);
|
||||
|
||||
// f <= 60
|
||||
let expr6 = col("f").lt_eq(lit(60));
|
||||
filters.push(expr6);
|
||||
|
||||
// g is not null --> won't be pushed down
|
||||
let expr7 = col("g").is_not_null();
|
||||
filters.push(expr7);
|
||||
|
||||
// h + i --> won't be pushed down
|
||||
let expr8 = col("h") + col("i");
|
||||
filters.push(expr8);
|
||||
|
||||
// city = Boston
|
||||
let expr9 = col("city").eq(lit("Boston"));
|
||||
filters.push(expr9);
|
||||
|
||||
// city != Braintree
|
||||
let expr9 = col("city").not_eq(lit("Braintree"));
|
||||
filters.push(expr9);
|
||||
|
||||
// city != state --> won't be pushed down
|
||||
let expr10 = col("city").not_eq(col("state"));
|
||||
filters.push(expr10);
|
||||
|
||||
// city = state --> won't be pushed down
|
||||
let expr11 = col("city").eq(col("state"));
|
||||
filters.push(expr11);
|
||||
|
||||
// city_state = city + state --> won't be pushed down
|
||||
let expr12 = col("city_sate").eq(col("city") + col("state"));
|
||||
filters.push(expr12);
|
||||
|
||||
// city = city + 5 --> won't be pushed down
|
||||
let expr13 = col("city").eq(col("city") + lit(5));
|
||||
filters.push(expr13);
|
||||
|
||||
// city = city --> won't be pushed down
|
||||
let expr14 = col("city").eq(col("city"));
|
||||
filters.push(expr14);
|
||||
|
||||
// city + 5 = city --> won't be pushed down
|
||||
let expr15 = (col("city") + lit(5)).eq(col("city"));
|
||||
filters.push(expr15);
|
||||
|
||||
// 5 = city
|
||||
let expr16 = lit(5).eq(col("city"));
|
||||
filters.push(expr16);
|
||||
|
||||
println!(" --------------- Filters: {:#?}", filters);
|
||||
|
||||
// Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city]
|
||||
let predicate = PredicateBuilder::default()
|
||||
.add_pushdown_exprs(&filters)
|
||||
.build();
|
||||
|
||||
println!(" ------------- Predicates: {:#?}", predicate);
|
||||
assert_eq!(predicate.exprs.len(), 8);
|
||||
assert_eq!(predicate.exprs[0], col("state").eq(lit("CA")));
|
||||
assert_eq!(predicate.exprs[1], col("price").gt(lit(10)));
|
||||
assert_eq!(predicate.exprs[2], col("a").lt(lit(10)));
|
||||
assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50)));
|
||||
assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60)));
|
||||
assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston")));
|
||||
assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree")));
|
||||
assert_eq!(predicate.exprs[7], lit(5).eq(col("city")));
|
||||
}
|
||||
#[test]
|
||||
fn predicate_display_ts() {
|
||||
// TODO make this a doc example?
|
||||
let p = PredicateBuilder::new().timestamp_range(1, 100).build();
|
||||
|
||||
assert_eq!(p.to_string(), "Predicate range: [1 - 100]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn predicate_display_ts_and_expr() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11))))
|
||||
.build();
|
||||
|
||||
assert_eq!(
|
||||
p.to_string(),
|
||||
"Predicate range: [1 - 100] exprs: [#foo = Int32(42) AND #bar < Int32(11)]"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn predicate_display_full() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.field_columns(vec!["f1", "f2"])
|
||||
.partition_key("the_key")
|
||||
.build();
|
||||
|
||||
assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(1, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range_low() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(MIN_NANO_TIME, 100)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_out_of_range_high() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(0, MAX_NANO_TIME)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = p.clone();
|
||||
|
||||
// no rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clear_timestamp_if_max_range_in_range() {
|
||||
let p = PredicateBuilder::new()
|
||||
.timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME)
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
|
||||
let expected = PredicateBuilder::new()
|
||||
.add_expr(col("foo").eq(lit(42)))
|
||||
.build();
|
||||
// rewrite
|
||||
assert_eq!(p.clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
//! Interface logic between IOx ['Predicate`] and predicates used by the
|
||||
//! InfluxDB Storage gRPC API
|
||||
use crate::predicate::{BinaryExpr, Predicate};
|
||||
use crate::rewrite;
|
||||
use crate::{rewrite, BinaryExpr, Predicate};
|
||||
|
||||
use datafusion::error::Result as DataFusionResult;
|
||||
use datafusion::execution::context::ExecutionProps;
|
||||
|
|
|
@ -18,8 +18,8 @@ use datafusion_util::AsExpr;
|
|||
|
||||
use hashbrown::HashSet;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::predicate::{BinaryExpr, Predicate, PredicateMatch};
|
||||
use predicate::rpc_predicate::{InfluxRpcPredicate, FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME};
|
||||
use predicate::{BinaryExpr, Predicate, PredicateMatch};
|
||||
use schema::selection::Selection;
|
||||
use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
@ -1834,7 +1834,7 @@ impl<'a> ExprRewriter for MissingColumnsToNull<'a> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::logical_plan::lit;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::PredicateBuilder;
|
||||
use schema::builder::SchemaBuilder;
|
||||
|
||||
use crate::{
|
||||
|
|
|
@ -16,10 +16,7 @@ use data_types::{
|
|||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use exec::stringset::StringSet;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::{
|
||||
predicate::{Predicate, PredicateMatch},
|
||||
rpc_predicate::QueryDatabaseMeta,
|
||||
};
|
||||
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch};
|
||||
use schema::selection::Selection;
|
||||
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ use datafusion::{
|
|||
},
|
||||
};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::predicate::{Predicate, PredicateBuilder};
|
||||
use predicate::{Predicate, PredicateBuilder};
|
||||
use schema::{merge::SchemaMerger, sort::SortKey, Schema};
|
||||
|
||||
use crate::{
|
||||
|
|
|
@ -16,7 +16,7 @@ use schema::selection::Selection;
|
|||
use schema::Schema;
|
||||
|
||||
use crate::QueryChunk;
|
||||
use predicate::predicate::Predicate;
|
||||
use predicate::Predicate;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ use datafusion::{
|
|||
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
|
||||
};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::predicate::Predicate;
|
||||
use predicate::Predicate;
|
||||
use schema::Schema;
|
||||
|
||||
use crate::{group_by::Aggregate, QueryChunkMeta};
|
||||
|
@ -228,7 +228,7 @@ mod test {
|
|||
use std::{cell::RefCell, sync::Arc};
|
||||
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::PredicateBuilder;
|
||||
use schema::merge::SchemaMerger;
|
||||
|
||||
use crate::{test::TestChunk, QueryChunk};
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use arrow::datatypes::DataType;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::fieldlist::{Field, FieldList},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -13,8 +13,8 @@ use crate::{
|
|||
},
|
||||
};
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::frontend::influxrpc::InfluxRpcPlanner;
|
||||
|
||||
/// runs read_filter(predicate) and compares it to the expected
|
||||
|
|
|
@ -14,8 +14,8 @@ use datafusion::{
|
|||
logical_plan::{binary_expr, Operator},
|
||||
prelude::*,
|
||||
};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate};
|
||||
|
||||
/// runs read_group(predicate) and compares it to the expected
|
||||
|
|
|
@ -10,8 +10,8 @@ use async_trait::async_trait;
|
|||
use data_types::{delete_predicate::DeletePredicate, timestamp::TimestampRange};
|
||||
use datafusion::prelude::*;
|
||||
use db::{test_helpers::write_lp, utils::make_db};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Tests for the Influx gRPC queries
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::stringset::{IntoStringSet, StringSetRef},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::stringset::{IntoStringSet, StringSetRef},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use datafusion::logical_plan::{col, lit};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::stringset::{IntoStringSet, StringSetRef},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -5,8 +5,8 @@ use db::{
|
|||
utils::{make_db, TestDb},
|
||||
};
|
||||
use metric::{Attributes, Metric, U64Counter};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::{stringset::StringSet, ExecutionContextProvider},
|
||||
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::io::Read;
|
|||
// current-thread executor
|
||||
use db::Db;
|
||||
use flate2::read::GzDecoder;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::io::Read;
|
|||
// current-thread executor
|
||||
use db::Db;
|
||||
use flate2::read::GzDecoder;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::io::Read;
|
|||
// current-thread executor
|
||||
use db::Db;
|
||||
use flate2::read::GzDecoder;
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
use predicate::rpc_predicate::InfluxRpcPredicate;
|
||||
use predicate::PredicateBuilder;
|
||||
use query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
|
|
Loading…
Reference in New Issue