From 2e30483f1f852f881fc5b0ad1c8ceb0486a61e22 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" <193874+carols10cents@users.noreply.github.com> Date: Mon, 7 Feb 2022 09:54:07 -0500 Subject: [PATCH] refactor: Remove predicate module from predicate crate (#3648) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- db/src/access.rs | 4 +- db/src/chunk.rs | 2 +- db/src/lib.rs | 2 +- db/src/pred.rs | 4 +- .../server_type/database/rpc/storage/expr.rs | 4 +- .../database/rpc/storage/service.rs | 4 +- ingester/src/query.rs | 5 +- parquet_file/src/chunk.rs | 2 +- parquet_file/src/storage.rs | 2 +- predicate/src/delete_predicate.rs | 2 +- predicate/src/lib.rs | 643 ++++++++++++++++- predicate/src/predicate.rs | 648 ------------------ predicate/src/rpc_predicate.rs | 3 +- query/src/frontend/influxrpc.rs | 4 +- query/src/lib.rs | 5 +- query/src/provider.rs | 2 +- query/src/provider/physical.rs | 2 +- query/src/pruning.rs | 4 +- query_tests/src/influxrpc/field_columns.rs | 2 +- query_tests/src/influxrpc/read_filter.rs | 2 +- query_tests/src/influxrpc/read_group.rs | 2 +- .../src/influxrpc/read_window_aggregate.rs | 2 +- query_tests/src/influxrpc/table_names.rs | 2 +- query_tests/src/influxrpc/tag_keys.rs | 2 +- query_tests/src/influxrpc/tag_values.rs | 2 +- query_tests/src/pruning.rs | 2 +- server_benchmarks/benches/read_filter.rs | 2 +- server_benchmarks/benches/read_group.rs | 2 +- server_benchmarks/benches/tag_values.rs | 2 +- 29 files changed, 675 insertions(+), 689 deletions(-) delete mode 100644 predicate/src/predicate.rs diff --git a/db/src/access.rs b/db/src/access.rs index 3e05728d62..b3d9519a22 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -14,7 +14,7 @@ use job_registry::JobRegistry; use metric::{Attributes, DurationCounter, Metric, U64Counter}; use observability_deps::tracing::debug; use parking_lot::Mutex; -use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta}; +use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, @@ -398,7 +398,7 @@ mod tests { use super::*; use crate::test_helpers::write_lp; use crate::utils::make_db; - use predicate::predicate::PredicateBuilder; + use predicate::PredicateBuilder; #[tokio::test] async fn test_filtered_chunks() { diff --git a/db/src/chunk.rs b/db/src/chunk.rs index 8ab333ec7e..e1c8e7824c 100644 --- a/db/src/chunk.rs +++ b/db/src/chunk.rs @@ -15,7 +15,7 @@ use mutable_buffer::snapshot::ChunkSnapshot; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; use partition_metadata::TableSummary; -use predicate::predicate::{Predicate, PredicateMatch}; +use predicate::{Predicate, PredicateMatch}; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use read_buffer::RBChunk; use schema::InfluxColumnType; diff --git a/db/src/lib.rs b/db/src/lib.rs index 018d028156..15f82d5899 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -42,7 +42,7 @@ use parquet_catalog::{ prune::prune_history as prune_catalog_transaction_history, }; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; -use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta}; +use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, QueryCompletedToken, QueryDatabase, diff --git a/db/src/pred.rs b/db/src/pred.rs index 0b8f456a2f..2af8dd2820 100644 --- a/db/src/pred.rs +++ b/db/src/pred.rs @@ -3,7 +3,7 @@ use std::convert::TryFrom; -use predicate::predicate::Predicate; +use predicate::Predicate; use snafu::Snafu; #[derive(Debug, Snafu)] @@ -55,7 +55,7 @@ pub mod test { use datafusion::logical_plan::{col, lit, Expr}; use datafusion::scalar::ScalarValue; - use predicate::predicate::PredicateBuilder; + use predicate::PredicateBuilder; use read_buffer::BinaryExpr as RBBinaryExpr; use read_buffer::Predicate as RBPredicate; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs index a05fe82ebd..e311d88b09 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs @@ -26,9 +26,9 @@ use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; use observability_deps::tracing::warn; use predicate::rpc_predicate::InfluxRpcPredicate; use predicate::{ - predicate::PredicateBuilder, regex::regex_match_expr, rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME}, + PredicateBuilder, }; use query::group_by::{Aggregate as QueryAggregate, WindowDuration}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -867,7 +867,7 @@ fn format_comparison(v: i32, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[cfg(test)] mod tests { use generated_types::node::Type as RPCNodeType; - use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta}; + use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; use std::{collections::BTreeSet, sync::Arc}; use super::*; diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs index 7aa9fb5cc1..b1faba9b0a 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/service.rs @@ -1351,7 +1351,7 @@ mod tests { Client as StorageClient, OrgAndBucket, }; use panic_logging::SendPanicsToTracing; - use predicate::predicate::{PredicateBuilder, PredicateMatch}; + use predicate::{PredicateBuilder, PredicateMatch}; use query::{ exec::Executor, test::{TestChunk, TestDatabase, TestError}, @@ -2971,7 +2971,7 @@ mod tests { db_name: &str, partition_key: &str, chunk_id: u128, - expected_predicate: &predicate::predicate::Predicate, + expected_predicate: &predicate::Predicate, ) { let actual_predicates = self .test_storage diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 0c783669f6..f63393cb22 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -15,10 +15,7 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, }; use iox_catalog::interface::{SequenceNumber, Tombstone}; -use predicate::{ - delete_predicate::parse_delete_predicate, - predicate::{Predicate, PredicateMatch}, -}; +use predicate::{delete_predicate::parse_delete_predicate, Predicate, PredicateMatch}; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; use snafu::{ResultExt, Snafu}; diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 6b8c149444..0fe2f328d4 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -5,7 +5,7 @@ use data_types::{ }; use datafusion::physical_plan::SendableRecordBatchStream; use iox_object_store::{IoxObjectStore, ParquetFilePath}; -use predicate::predicate::Predicate; +use predicate::Predicate; use schema::selection::Selection; use schema::{Schema, TIME_COLUMN_NAME}; use snafu::{ResultExt, Snafu}; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 1d231892a6..90600d6809 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -22,7 +22,7 @@ use parquet::{ basic::Compression, file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone}, }; -use predicate::predicate::Predicate; +use predicate::Predicate; use schema::selection::Selection; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index cedaddf34a..95a6114ff5 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -68,7 +68,7 @@ pub enum Error { /// Result type for Parser Cient pub type Result = std::result::Result; -impl From for crate::predicate::Predicate { +impl From for crate::Predicate { fn from(pred: DeletePredicate) -> Self { Self { field_columns: None, diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index da8da2042e..4b6728d1a3 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -10,7 +10,648 @@ pub mod delete_expr; pub mod delete_predicate; -pub mod predicate; pub mod regex; pub mod rewrite; pub mod rpc_predicate; + +use data_types::timestamp::{TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME}; +use datafusion::{ + error::DataFusionError, + logical_plan::{col, lit_timestamp_nano, Column, Expr, Operator}, + optimizer::utils, +}; +use datafusion_util::{make_range_expr, AndExprBuilder}; +use observability_deps::tracing::debug; +use schema::TIME_COLUMN_NAME; +use std::{ + collections::{BTreeSet, HashSet}, + fmt, +}; + +/// This `Predicate` represents the empty predicate (aka that evaluates to true for all rows). +pub const EMPTY_PREDICATE: Predicate = Predicate { + field_columns: None, + exprs: vec![], + range: None, + partition_key: None, + value_expr: vec![], +}; + +/// A unified Predicate structure for IOx queries that can select and filter Fields and Tags from +/// the InfluxDB data mode, as well as for arbitrary other predicates that are expressed by +/// DataFusion's [`Expr`] type. +/// +/// Note that the InfluxDB data model (e.g. ParsedLine's) distinguishes between some types of +/// columns (tags and fields), and likewise the semantics of this structure can express some types +/// of restrictions that only apply to certain types of columns. +#[derive(Clone, Debug, Default, PartialEq, PartialOrd)] +pub struct Predicate { + /// Optional field restriction. If present, restricts the results to only + /// tables which have *at least one* of the fields in field_columns. + pub field_columns: Option>, + + /// Optional partition key filter + pub partition_key: Option, + + /// Optional timestamp range: only rows within this range are included in + /// results. Other rows are excluded + pub range: Option, + + /// Optional arbitrary predicates, represented as list of + /// DataFusion expressions applied a logical conjunction (aka they + /// are 'AND'ed together). Only rows that evaluate to TRUE for all + /// these expressions should be returned. Other rows are excluded + /// from the results. + pub exprs: Vec, + + /// Optional arbitrary predicates on the special `_value` column. These + /// expressions are applied to `field_columns` projections in the form of + /// `CASE` statement conditions. + pub value_expr: Vec, +} + +impl Predicate { + /// Return true if this predicate has any general purpose predicates + pub fn has_exprs(&self) -> bool { + !self.exprs.is_empty() + } + + /// Return a DataFusion `Expr` predicate representing the + /// combination of all predicate (`exprs`) and timestamp + /// restriction in this Predicate. Returns None if there are no + /// `Expr`'s restricting the data + pub fn filter_expr(&self) -> Option { + let mut builder = + AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr()); + + for expr in &self.exprs { + builder = builder.append_expr(expr.clone()); + } + + builder.build() + } + + /// Return true if the field should be included in results + pub fn should_include_field(&self, field_name: &str) -> bool { + match &self.field_columns { + None => true, // No field restriction on predicate + Some(field_names) => field_names.contains(field_name), + } + } + + /// Creates a DataFusion predicate for appliying a timestamp range: + /// + /// `range.start <= time and time < range.end` + fn make_timestamp_predicate_expr(&self) -> Option { + self.range + .map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME)) + } + + /// Returns true if ths predicate evaluates to true for all rows + pub fn is_empty(&self) -> bool { + self == &EMPTY_PREDICATE + } + + /// Return a negated DF logical expression for the given delete predicates + pub fn negated_expr(delete_predicates: &[S]) -> Option + where + S: AsRef, + { + if delete_predicates.is_empty() { + return None; + } + + let mut pred = PredicateBuilder::default().build(); + pred.merge_delete_predicates(delete_predicates); + + // Make a conjunctive expression of the pred.exprs + let mut val = None; + for e in pred.exprs { + match val { + None => val = Some(e), + Some(expr) => val = Some(expr.and(e)), + } + } + + val + } + + /// Merge the given delete predicates into this select predicate. + /// Since we want to eliminate data filtered by the delete predicates, + /// they are first converted into their negated form: NOT(delete_predicate) + /// then added/merged into the selection one + pub fn merge_delete_predicates(&mut self, delete_predicates: &[S]) + where + S: AsRef, + { + // Create a list of disjunctive negated expressions. + // Example: there are two deletes as follows (note that time_range is stored separated in the Predicate + // but we need to put it together with the exprs here) + // . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30) + // . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50) + // The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means + // NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30]), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50]) which means + // [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30])], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50])] + // Note that the "NOT(time_range in [20, 50])]" or "NOT(20 <= time <= 50)"" is replaced with "time < 20 OR time > 50" + + for pred in delete_predicates { + let pred = pred.as_ref(); + + let mut expr: Option = None; + + // Time range + if let Some(range) = pred.range { + // time_expr = NOT(start <= time_range <= end) + // Equivalent to: (time < start OR time > end) + let time_expr = col(TIME_COLUMN_NAME) + .lt(lit_timestamp_nano(range.start())) + .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end()))); + + match expr { + None => expr = Some(time_expr), + Some(e) => expr = Some(e.or(time_expr)), + } + } + + // Exprs + for exp in &pred.exprs { + match expr { + None => expr = Some(exp.clone().not()), + Some(e) => expr = Some(e.or(exp.clone().not())), + } + } + + // Push the negated expression of the delete predicate into the list exprs of the select predicate + if let Some(e) = expr { + self.exprs.push(e); + } + } + } + + /// Removes the timestamp range from this predicate, if the range + /// is for the entire min/max valid range. + /// + /// This is used in certain cases to retain compatibility with the + /// existing storage engine + pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self { + self.range = self.range.take().and_then(|range| { + if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME { + None + } else { + Some(range) + } + }); + + self + } +} + +impl fmt::Display for Predicate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fn iter_to_str(s: impl IntoIterator) -> String + where + S: ToString, + { + s.into_iter() + .map(|v| v.to_string()) + .collect::>() + .join(", ") + } + + write!(f, "Predicate")?; + + if let Some(field_columns) = &self.field_columns { + write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?; + } + + if let Some(partition_key) = &self.partition_key { + write!(f, " partition_key: '{}'", partition_key)?; + } + + if let Some(range) = &self.range { + // TODO: could be nice to show this as actual timestamps (not just numbers)? + write!(f, " range: [{} - {}]", range.start(), range.end())?; + } + + if !self.exprs.is_empty() { + write!(f, " exprs: [")?; + for (i, expr) in self.exprs.iter().enumerate() { + write!(f, "{}", expr)?; + if i < self.exprs.len() - 1 { + write!(f, ", ")?; + } + } + write!(f, "]")?; + } + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +/// The result of evaluating a predicate on a set of rows +pub enum PredicateMatch { + /// There is at least one row that matches the predicate that has + /// at least one non null value in each field of the predicate + AtLeastOneNonNullField, + + /// There are exactly zero rows that match the predicate + Zero, + + /// There *may* be rows that match, OR there *may* be no rows that + /// match + Unknown, +} + +/// Structure for building [`Predicate`]s +/// +/// Example: +/// ``` +/// use predicate::PredicateBuilder; +/// use datafusion::logical_plan::{col, lit}; +/// +/// let p = PredicateBuilder::new() +/// .timestamp_range(1, 100) +/// .add_expr(col("foo").eq(lit(42))) +/// .build(); +/// +/// assert_eq!( +/// p.to_string(), +/// "Predicate range: [1 - 100] exprs: [#foo = Int32(42)]" +/// ); +/// ``` +#[derive(Debug, Default)] +pub struct PredicateBuilder { + inner: Predicate, +} + +impl From for PredicateBuilder { + fn from(inner: Predicate) -> Self { + Self { inner } + } +} + +impl PredicateBuilder { + pub fn new() -> Self { + Self::default() + } + + /// Sets the timestamp range + pub fn timestamp_range(mut self, start: i64, end: i64) -> Self { + // Without more thought, redefining the timestamp range would + // lose the old range. Asser that that cannot happen. + assert!( + self.inner.range.is_none(), + "Unexpected re-definition of timestamp range" + ); + + self.inner.range = Some(TimestampRange::new(start, end)); + self + } + + /// sets the optional timestamp range, if any + pub fn timestamp_range_option(mut self, range: Option) -> Self { + // Without more thought, redefining the timestamp range would + // lose the old range. Asser that that cannot happen. + assert!( + range.is_none() || self.inner.range.is_none(), + "Unexpected re-definition of timestamp range" + ); + self.inner.range = range; + self + } + + /// Adds an expression to the list of general purpose predicates + pub fn add_expr(mut self, expr: Expr) -> Self { + self.inner.exprs.push(expr); + self + } + + /// Builds a regex matching expression from the provided column name and + /// pattern. Values not matching the regex will be filtered out. + pub fn build_regex_match_expr(self, column: &str, pattern: impl Into) -> Self { + self.regex_match_expr(column, pattern, true) + } + + /// Builds a regex "not matching" expression from the provided column name + /// and pattern. Values *matching* the regex will be filtered out. + pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into) -> Self { + self.regex_match_expr(column, pattern, false) + } + + fn regex_match_expr(mut self, column: &str, pattern: impl Into, matches: bool) -> Self { + let expr = crate::regex::regex_match_expr(col(column), pattern.into(), matches); + self.inner.exprs.push(expr); + self + } + + /// Sets field_column restriction + pub fn field_columns(mut self, columns: Vec>) -> Self { + // We need to distinguish predicates like `column_name In + // (foo, bar)` and `column_name = foo and column_name = bar` in order to handle + // this + if self.inner.field_columns.is_some() { + unimplemented!("Complex/Multi field predicates are not yet supported"); + } + + let column_names = columns + .into_iter() + .map(|s| s.into()) + .collect::>(); + + self.inner.field_columns = Some(column_names); + self + } + + /// Set the partition key restriction + pub fn partition_key(mut self, partition_key: impl Into) -> Self { + assert!( + self.inner.partition_key.is_none(), + "multiple partition key predicates not suported" + ); + self.inner.partition_key = Some(partition_key.into()); + self + } + + /// Create a predicate, consuming this builder + pub fn build(self) -> Predicate { + self.inner + } + + /// Adds only the expressions from `filters` that can be pushed down to + /// execution engines. + pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self { + // For each expression of the filters, recursively split it, if it is is an AND conjunction + // For example, expression (x AND y) will be split into a vector of 2 expressions [x, y] + let mut exprs = vec![]; + filters + .iter() + .for_each(|expr| Self::split_members(expr, &mut exprs)); + + // Only keep single_column and primitive binary expressions + let mut pushdown_exprs: Vec = vec![]; + let exprs_result = exprs + .into_iter() + .try_for_each::<_, Result<_, DataFusionError>>(|expr| { + let mut columns = HashSet::new(); + utils::expr_to_columns(&expr, &mut columns)?; + + if columns.len() == 1 && Self::primitive_binary_expr(&expr) { + pushdown_exprs.push(expr); + } + Ok(()) + }); + + match exprs_result { + Ok(()) => { + // Return the builder with only the pushdownable expressions on it. + self.inner.exprs.append(&mut pushdown_exprs); + } + Err(e) => { + debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters); + } + } + + self + } + + /// Recursively split all "AND" expressions into smaller one + /// Example: "A AND B AND C" => [A, B, C] + pub fn split_members(predicate: &Expr, predicates: &mut Vec) { + match predicate { + Expr::BinaryExpr { + right, + op: Operator::And, + left, + } => { + Self::split_members(left, predicates); + Self::split_members(right, predicates); + } + other => predicates.push(other.clone()), + } + } + + /// Return true if the given expression is in a primitive binary in the form: `column op constant` + // and op must be a comparison one + pub fn primitive_binary_expr(expr: &Expr) -> bool { + match expr { + Expr::BinaryExpr { left, op, right } => { + matches!( + (&**left, &**right), + (Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_)) + ) && matches!( + op, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + ) + } + _ => false, + } + } +} + +// A representation of the `BinaryExpr` variant of a Datafusion expression. +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub struct BinaryExpr { + pub left: Column, + pub op: Operator, + pub right: Expr, +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::logical_plan::{col, lit}; + + #[test] + fn test_default_predicate_is_empty() { + let p = Predicate::default(); + assert!(p.is_empty()); + } + + #[test] + fn test_non_default_predicate_is_not_empty() { + let p = PredicateBuilder::new().timestamp_range(1, 100).build(); + + assert!(!p.is_empty()); + } + + #[test] + fn test_pushdown_predicates() { + let mut filters = vec![]; + + // state = CA + let expr1 = col("state").eq(lit("CA")); + filters.push(expr1); + + // "price > 10" + let expr2 = col("price").gt(lit(10)); + filters.push(expr2); + + // a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50] + let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50))); + filters.push(expr3); + + // c != 3 OR d = 8 --> won't be pushed down + let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8))); + filters.push(expr4); + + // e is null --> won't be pushed down + let expr5 = col("e").is_null(); + filters.push(expr5); + + // f <= 60 + let expr6 = col("f").lt_eq(lit(60)); + filters.push(expr6); + + // g is not null --> won't be pushed down + let expr7 = col("g").is_not_null(); + filters.push(expr7); + + // h + i --> won't be pushed down + let expr8 = col("h") + col("i"); + filters.push(expr8); + + // city = Boston + let expr9 = col("city").eq(lit("Boston")); + filters.push(expr9); + + // city != Braintree + let expr9 = col("city").not_eq(lit("Braintree")); + filters.push(expr9); + + // city != state --> won't be pushed down + let expr10 = col("city").not_eq(col("state")); + filters.push(expr10); + + // city = state --> won't be pushed down + let expr11 = col("city").eq(col("state")); + filters.push(expr11); + + // city_state = city + state --> won't be pushed down + let expr12 = col("city_sate").eq(col("city") + col("state")); + filters.push(expr12); + + // city = city + 5 --> won't be pushed down + let expr13 = col("city").eq(col("city") + lit(5)); + filters.push(expr13); + + // city = city --> won't be pushed down + let expr14 = col("city").eq(col("city")); + filters.push(expr14); + + // city + 5 = city --> won't be pushed down + let expr15 = (col("city") + lit(5)).eq(col("city")); + filters.push(expr15); + + // 5 = city + let expr16 = lit(5).eq(col("city")); + filters.push(expr16); + + println!(" --------------- Filters: {:#?}", filters); + + // Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city] + let predicate = PredicateBuilder::default() + .add_pushdown_exprs(&filters) + .build(); + + println!(" ------------- Predicates: {:#?}", predicate); + assert_eq!(predicate.exprs.len(), 8); + assert_eq!(predicate.exprs[0], col("state").eq(lit("CA"))); + assert_eq!(predicate.exprs[1], col("price").gt(lit(10))); + assert_eq!(predicate.exprs[2], col("a").lt(lit(10))); + assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50))); + assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60))); + assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston"))); + assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree"))); + assert_eq!(predicate.exprs[7], lit(5).eq(col("city"))); + } + #[test] + fn predicate_display_ts() { + // TODO make this a doc example? + let p = PredicateBuilder::new().timestamp_range(1, 100).build(); + + assert_eq!(p.to_string(), "Predicate range: [1 - 100]"); + } + + #[test] + fn predicate_display_ts_and_expr() { + let p = PredicateBuilder::new() + .timestamp_range(1, 100) + .add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11)))) + .build(); + + assert_eq!( + p.to_string(), + "Predicate range: [1 - 100] exprs: [#foo = Int32(42) AND #bar < Int32(11)]" + ); + } + + #[test] + fn predicate_display_full() { + let p = PredicateBuilder::new() + .timestamp_range(1, 100) + .add_expr(col("foo").eq(lit(42))) + .field_columns(vec!["f1", "f2"]) + .partition_key("the_key") + .build(); + + assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]"); + } + + #[test] + fn test_clear_timestamp_if_max_range_out_of_range() { + let p = PredicateBuilder::new() + .timestamp_range(1, 100) + .add_expr(col("foo").eq(lit(42))) + .build(); + + let expected = p.clone(); + + // no rewrite + assert_eq!(p.clear_timestamp_if_max_range(), expected); + } + + #[test] + fn test_clear_timestamp_if_max_range_out_of_range_low() { + let p = PredicateBuilder::new() + .timestamp_range(MIN_NANO_TIME, 100) + .add_expr(col("foo").eq(lit(42))) + .build(); + + let expected = p.clone(); + + // no rewrite + assert_eq!(p.clear_timestamp_if_max_range(), expected); + } + + #[test] + fn test_clear_timestamp_if_max_range_out_of_range_high() { + let p = PredicateBuilder::new() + .timestamp_range(0, MAX_NANO_TIME) + .add_expr(col("foo").eq(lit(42))) + .build(); + + let expected = p.clone(); + + // no rewrite + assert_eq!(p.clear_timestamp_if_max_range(), expected); + } + + #[test] + fn test_clear_timestamp_if_max_range_in_range() { + let p = PredicateBuilder::new() + .timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME) + .add_expr(col("foo").eq(lit(42))) + .build(); + + let expected = PredicateBuilder::new() + .add_expr(col("foo").eq(lit(42))) + .build(); + // rewrite + assert_eq!(p.clear_timestamp_if_max_range(), expected); + } +} diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs deleted file mode 100644 index 246c6888f0..0000000000 --- a/predicate/src/predicate.rs +++ /dev/null @@ -1,648 +0,0 @@ -//! This module contains a unified Predicate structure for IOx qieries -//! that can select and filter Fields and Tags from the InfluxDB data -//! mode as well as for arbitrary other predicates that are expressed -//! by DataFusion's `Expr` type. - -use std::{ - collections::{BTreeSet, HashSet}, - fmt, -}; - -use data_types::timestamp::{TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME}; -use datafusion::{ - error::DataFusionError, - logical_plan::{col, lit_timestamp_nano, Column, Expr, Operator}, - optimizer::utils, -}; -use datafusion_util::{make_range_expr, AndExprBuilder}; -use observability_deps::tracing::debug; -use schema::TIME_COLUMN_NAME; - -/// This `Predicate` represents the empty predicate (aka that -/// evaluates to true for all rows). -pub const EMPTY_PREDICATE: Predicate = Predicate { - field_columns: None, - exprs: vec![], - range: None, - partition_key: None, - value_expr: vec![], -}; - -#[derive(Debug, Clone, Copy)] -/// The result of evaluating a predicate on a set of rows -pub enum PredicateMatch { - /// There is at least one row that matches the predicate that has - /// at least one non null value in each field of the predicate - AtLeastOneNonNullField, - - /// There are exactly zero rows that match the predicate - Zero, - - /// There *may* be rows that match, OR there *may* be no rows that - /// match - Unknown, -} - -/// Represents a parsed predicate for evaluation by the InfluxDB IOx -/// query engine. -/// -/// Note that the InfluxDB data model (e.g. ParsedLine's) -/// distinguishes between some types of columns (tags and fields), and -/// likewise the semantics of this structure can express some types of -/// restrictions that only apply to certain types of columns. -#[derive(Clone, Debug, Default, PartialEq, PartialOrd)] -pub struct Predicate { - /// Optional field restriction. If present, restricts the results to only - /// tables which have *at least one* of the fields in field_columns. - pub field_columns: Option>, - - /// Optional partition key filter - pub partition_key: Option, - - /// Optional timestamp range: only rows within this range are included in - /// results. Other rows are excluded - pub range: Option, - - /// Optional arbitrary predicates, represented as list of - /// DataFusion expressions applied a logical conjunction (aka they - /// are 'AND'ed together). Only rows that evaluate to TRUE for all - /// these expressions should be returned. Other rows are excluded - /// from the results. - pub exprs: Vec, - - /// Optional arbitrary predicates on the special `_value` column. These - /// expressions are applied to `field_columns` projections in the form of - /// `CASE` statement conditions. - pub value_expr: Vec, -} - -impl Predicate { - /// Return true if this predicate has any general purpose predicates - pub fn has_exprs(&self) -> bool { - !self.exprs.is_empty() - } - - /// Return a DataFusion `Expr` predicate representing the - /// combination of all predicate (`exprs`) and timestamp - /// restriction in this Predicate. Returns None if there are no - /// `Expr`'s restricting the data - pub fn filter_expr(&self) -> Option { - let mut builder = - AndExprBuilder::default().append_opt(self.make_timestamp_predicate_expr()); - - for expr in &self.exprs { - builder = builder.append_expr(expr.clone()); - } - - builder.build() - } - - /// Return true if the field should be included in results - pub fn should_include_field(&self, field_name: &str) -> bool { - match &self.field_columns { - None => true, // No field restriction on predicate - Some(field_names) => field_names.contains(field_name), - } - } - - /// Creates a DataFusion predicate for appliying a timestamp range: - /// - /// `range.start <= time and time < range.end` - fn make_timestamp_predicate_expr(&self) -> Option { - self.range - .map(|range| make_range_expr(range.start(), range.end(), TIME_COLUMN_NAME)) - } - - /// Returns true if ths predicate evaluates to true for all rows - pub fn is_empty(&self) -> bool { - self == &EMPTY_PREDICATE - } - - /// Return a negated DF logical expression for the given delete predicates - pub fn negated_expr(delete_predicates: &[S]) -> Option - where - S: AsRef, - { - if delete_predicates.is_empty() { - return None; - } - - let mut pred = PredicateBuilder::default().build(); - pred.merge_delete_predicates(delete_predicates); - - // Make a conjunctive expression of the pred.exprs - let mut val = None; - for e in pred.exprs { - match val { - None => val = Some(e), - Some(expr) => val = Some(expr.and(e)), - } - } - - val - } - - /// Merge the given delete predicates into this select predicate. - /// Since we want to eliminate data filtered by the delete predicates, - /// they are first converted into their negated form: NOT(delete_predicate) - /// then added/merged into the selection one - pub fn merge_delete_predicates(&mut self, delete_predicates: &[S]) - where - S: AsRef, - { - // Create a list of disjunctive negated expressions. - // Example: there are two deletes as follows (note that time_range is stored separated in the Predicate - // but we need to put it together with the exprs here) - // . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30) - // . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50) - // The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means - // NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30]), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50]) which means - // [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30])], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50])] - // Note that the "NOT(time_range in [20, 50])]" or "NOT(20 <= time <= 50)"" is replaced with "time < 20 OR time > 50" - - for pred in delete_predicates { - let pred = pred.as_ref(); - - let mut expr: Option = None; - - // Time range - if let Some(range) = pred.range { - // time_expr = NOT(start <= time_range <= end) - // Equivalent to: (time < start OR time > end) - let time_expr = col(TIME_COLUMN_NAME) - .lt(lit_timestamp_nano(range.start())) - .or(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(range.end()))); - - match expr { - None => expr = Some(time_expr), - Some(e) => expr = Some(e.or(time_expr)), - } - } - - // Exprs - for exp in &pred.exprs { - match expr { - None => expr = Some(exp.clone().not()), - Some(e) => expr = Some(e.or(exp.clone().not())), - } - } - - // Push the negated expression of the delete predicate into the list exprs of the select predicate - if let Some(e) = expr { - self.exprs.push(e); - } - } - } - - /// Removes the timestamp range from this predicate, if the range - /// is for the entire min/max valid range. - /// - /// This is used in certain cases to retain compatibility with the - /// existing storage engine - pub(crate) fn clear_timestamp_if_max_range(mut self) -> Self { - self.range = self.range.take().and_then(|range| { - if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME { - None - } else { - Some(range) - } - }); - - self - } -} - -impl fmt::Display for Predicate { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fn iter_to_str(s: impl IntoIterator) -> String - where - S: ToString, - { - s.into_iter() - .map(|v| v.to_string()) - .collect::>() - .join(", ") - } - - write!(f, "Predicate")?; - - if let Some(field_columns) = &self.field_columns { - write!(f, " field_columns: {{{}}}", iter_to_str(field_columns))?; - } - - if let Some(partition_key) = &self.partition_key { - write!(f, " partition_key: '{}'", partition_key)?; - } - - if let Some(range) = &self.range { - // TODO: could be nice to show this as actual timestamps (not just numbers)? - write!(f, " range: [{} - {}]", range.start(), range.end())?; - } - - if !self.exprs.is_empty() { - write!(f, " exprs: [")?; - for (i, expr) in self.exprs.iter().enumerate() { - write!(f, "{}", expr)?; - if i < self.exprs.len() - 1 { - write!(f, ", ")?; - } - } - write!(f, "]")?; - } - Ok(()) - } -} - -#[derive(Debug, Default)] -/// Structure for building [`Predicate`]s -/// -/// Example: -/// ``` -/// use predicate::predicate::PredicateBuilder; -/// use datafusion::logical_plan::{col, lit}; -/// -/// let p = PredicateBuilder::new() -/// .timestamp_range(1, 100) -/// .add_expr(col("foo").eq(lit(42))) -/// .build(); -/// -/// assert_eq!( -/// p.to_string(), -/// "Predicate range: [1 - 100] exprs: [#foo = Int32(42)]" -/// ); -/// ``` -pub struct PredicateBuilder { - inner: Predicate, -} - -impl From for PredicateBuilder { - fn from(inner: Predicate) -> Self { - Self { inner } - } -} - -impl PredicateBuilder { - pub fn new() -> Self { - Self::default() - } - - /// Sets the timestamp range - pub fn timestamp_range(mut self, start: i64, end: i64) -> Self { - // Without more thought, redefining the timestamp range would - // lose the old range. Asser that that cannot happen. - assert!( - self.inner.range.is_none(), - "Unexpected re-definition of timestamp range" - ); - - self.inner.range = Some(TimestampRange::new(start, end)); - self - } - - /// sets the optional timestamp range, if any - pub fn timestamp_range_option(mut self, range: Option) -> Self { - // Without more thought, redefining the timestamp range would - // lose the old range. Asser that that cannot happen. - assert!( - range.is_none() || self.inner.range.is_none(), - "Unexpected re-definition of timestamp range" - ); - self.inner.range = range; - self - } - - /// Adds an expression to the list of general purpose predicates - pub fn add_expr(mut self, expr: Expr) -> Self { - self.inner.exprs.push(expr); - self - } - - /// Builds a regex matching expression from the provided column name and - /// pattern. Values not matching the regex will be filtered out. - pub fn build_regex_match_expr(self, column: &str, pattern: impl Into) -> Self { - self.regex_match_expr(column, pattern, true) - } - - /// Builds a regex "not matching" expression from the provided column name - /// and pattern. Values *matching* the regex will be filtered out. - pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into) -> Self { - self.regex_match_expr(column, pattern, false) - } - - fn regex_match_expr(mut self, column: &str, pattern: impl Into, matches: bool) -> Self { - let expr = crate::regex::regex_match_expr(col(column), pattern.into(), matches); - self.inner.exprs.push(expr); - self - } - - /// Sets field_column restriction - pub fn field_columns(mut self, columns: Vec>) -> Self { - // We need to distinguish predicates like `column_name In - // (foo, bar)` and `column_name = foo and column_name = bar` in order to handle - // this - if self.inner.field_columns.is_some() { - unimplemented!("Complex/Multi field predicates are not yet supported"); - } - - let column_names = columns - .into_iter() - .map(|s| s.into()) - .collect::>(); - - self.inner.field_columns = Some(column_names); - self - } - - /// Set the partition key restriction - pub fn partition_key(mut self, partition_key: impl Into) -> Self { - assert!( - self.inner.partition_key.is_none(), - "multiple partition key predicates not suported" - ); - self.inner.partition_key = Some(partition_key.into()); - self - } - - /// Create a predicate, consuming this builder - pub fn build(self) -> Predicate { - self.inner - } - - /// Adds only the expressions from `filters` that can be pushed down to - /// execution engines. - pub fn add_pushdown_exprs(mut self, filters: &[Expr]) -> Self { - // For each expression of the filters, recursively split it, if it is is an AND conjunction - // For example, expression (x AND y) will be split into a vector of 2 expressions [x, y] - let mut exprs = vec![]; - filters - .iter() - .for_each(|expr| Self::split_members(expr, &mut exprs)); - - // Only keep single_column and primitive binary expressions - let mut pushdown_exprs: Vec = vec![]; - let exprs_result = exprs - .into_iter() - .try_for_each::<_, Result<_, DataFusionError>>(|expr| { - let mut columns = HashSet::new(); - utils::expr_to_columns(&expr, &mut columns)?; - - if columns.len() == 1 && Self::primitive_binary_expr(&expr) { - pushdown_exprs.push(expr); - } - Ok(()) - }); - - match exprs_result { - Ok(()) => { - // Return the builder with only the pushdownable expressions on it. - self.inner.exprs.append(&mut pushdown_exprs); - } - Err(e) => { - debug!("Error, {}, building push-down predicates for filters: {:#?}. No predicates are pushed down", e, filters); - } - } - - self - } - - /// Recursively split all "AND" expressions into smaller one - /// Example: "A AND B AND C" => [A, B, C] - pub fn split_members(predicate: &Expr, predicates: &mut Vec) { - match predicate { - Expr::BinaryExpr { - right, - op: Operator::And, - left, - } => { - Self::split_members(left, predicates); - Self::split_members(right, predicates); - } - other => predicates.push(other.clone()), - } - } - - /// Return true if the given expression is in a primitive binary in the form: `column op constant` - // and op must be a comparison one - pub fn primitive_binary_expr(expr: &Expr) -> bool { - match expr { - Expr::BinaryExpr { left, op, right } => { - matches!( - (&**left, &**right), - (Expr::Column(_), Expr::Literal(_)) | (Expr::Literal(_), Expr::Column(_)) - ) && matches!( - op, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - ) - } - _ => false, - } - } -} - -// A representation of the `BinaryExpr` variant of a Datafusion expression. -#[derive(Clone, Debug, PartialEq, PartialOrd)] -pub struct BinaryExpr { - pub left: Column, - pub op: Operator, - pub right: Expr, -} - -#[cfg(test)] -mod tests { - use super::*; - use datafusion::logical_plan::{col, lit}; - - #[test] - fn test_default_predicate_is_empty() { - let p = Predicate::default(); - assert!(p.is_empty()); - } - - #[test] - fn test_non_default_predicate_is_not_empty() { - let p = PredicateBuilder::new().timestamp_range(1, 100).build(); - - assert!(!p.is_empty()); - } - - #[test] - fn test_pushdown_predicates() { - let mut filters = vec![]; - - // state = CA - let expr1 = col("state").eq(lit("CA")); - filters.push(expr1); - - // "price > 10" - let expr2 = col("price").gt(lit(10)); - filters.push(expr2); - - // a < 10 AND b >= 50 --> will be split to [a < 10, b >= 50] - let expr3 = col("a").lt(lit(10)).and(col("b").gt_eq(lit(50))); - filters.push(expr3); - - // c != 3 OR d = 8 --> won't be pushed down - let expr4 = col("c").not_eq(lit(3)).or(col("d").eq(lit(8))); - filters.push(expr4); - - // e is null --> won't be pushed down - let expr5 = col("e").is_null(); - filters.push(expr5); - - // f <= 60 - let expr6 = col("f").lt_eq(lit(60)); - filters.push(expr6); - - // g is not null --> won't be pushed down - let expr7 = col("g").is_not_null(); - filters.push(expr7); - - // h + i --> won't be pushed down - let expr8 = col("h") + col("i"); - filters.push(expr8); - - // city = Boston - let expr9 = col("city").eq(lit("Boston")); - filters.push(expr9); - - // city != Braintree - let expr9 = col("city").not_eq(lit("Braintree")); - filters.push(expr9); - - // city != state --> won't be pushed down - let expr10 = col("city").not_eq(col("state")); - filters.push(expr10); - - // city = state --> won't be pushed down - let expr11 = col("city").eq(col("state")); - filters.push(expr11); - - // city_state = city + state --> won't be pushed down - let expr12 = col("city_sate").eq(col("city") + col("state")); - filters.push(expr12); - - // city = city + 5 --> won't be pushed down - let expr13 = col("city").eq(col("city") + lit(5)); - filters.push(expr13); - - // city = city --> won't be pushed down - let expr14 = col("city").eq(col("city")); - filters.push(expr14); - - // city + 5 = city --> won't be pushed down - let expr15 = (col("city") + lit(5)).eq(col("city")); - filters.push(expr15); - - // 5 = city - let expr16 = lit(5).eq(col("city")); - filters.push(expr16); - - println!(" --------------- Filters: {:#?}", filters); - - // Expected pushdown predicates: [state = CA, price > 10, a < 10, b >= 50, f <= 60, city = Boston, city != Braintree, 5 = city] - let predicate = PredicateBuilder::default() - .add_pushdown_exprs(&filters) - .build(); - - println!(" ------------- Predicates: {:#?}", predicate); - assert_eq!(predicate.exprs.len(), 8); - assert_eq!(predicate.exprs[0], col("state").eq(lit("CA"))); - assert_eq!(predicate.exprs[1], col("price").gt(lit(10))); - assert_eq!(predicate.exprs[2], col("a").lt(lit(10))); - assert_eq!(predicate.exprs[3], col("b").gt_eq(lit(50))); - assert_eq!(predicate.exprs[4], col("f").lt_eq(lit(60))); - assert_eq!(predicate.exprs[5], col("city").eq(lit("Boston"))); - assert_eq!(predicate.exprs[6], col("city").not_eq(lit("Braintree"))); - assert_eq!(predicate.exprs[7], lit(5).eq(col("city"))); - } - #[test] - fn predicate_display_ts() { - // TODO make this a doc example? - let p = PredicateBuilder::new().timestamp_range(1, 100).build(); - - assert_eq!(p.to_string(), "Predicate range: [1 - 100]"); - } - - #[test] - fn predicate_display_ts_and_expr() { - let p = PredicateBuilder::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42)).and(col("bar").lt(lit(11)))) - .build(); - - assert_eq!( - p.to_string(), - "Predicate range: [1 - 100] exprs: [#foo = Int32(42) AND #bar < Int32(11)]" - ); - } - - #[test] - fn predicate_display_full() { - let p = PredicateBuilder::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42))) - .field_columns(vec!["f1", "f2"]) - .partition_key("the_key") - .build(); - - assert_eq!(p.to_string(), "Predicate field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]"); - } - - #[test] - fn test_clear_timestamp_if_max_range_out_of_range() { - let p = PredicateBuilder::new() - .timestamp_range(1, 100) - .add_expr(col("foo").eq(lit(42))) - .build(); - - let expected = p.clone(); - - // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); - } - - #[test] - fn test_clear_timestamp_if_max_range_out_of_range_low() { - let p = PredicateBuilder::new() - .timestamp_range(MIN_NANO_TIME, 100) - .add_expr(col("foo").eq(lit(42))) - .build(); - - let expected = p.clone(); - - // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); - } - - #[test] - fn test_clear_timestamp_if_max_range_out_of_range_high() { - let p = PredicateBuilder::new() - .timestamp_range(0, MAX_NANO_TIME) - .add_expr(col("foo").eq(lit(42))) - .build(); - - let expected = p.clone(); - - // no rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); - } - - #[test] - fn test_clear_timestamp_if_max_range_in_range() { - let p = PredicateBuilder::new() - .timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME) - .add_expr(col("foo").eq(lit(42))) - .build(); - - let expected = PredicateBuilder::new() - .add_expr(col("foo").eq(lit(42))) - .build(); - // rewrite - assert_eq!(p.clear_timestamp_if_max_range(), expected); - } -} diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index 2004d8e1c4..e888d27e24 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -1,7 +1,6 @@ //! Interface logic between IOx ['Predicate`] and predicates used by the //! InfluxDB Storage gRPC API -use crate::predicate::{BinaryExpr, Predicate}; -use crate::rewrite; +use crate::{rewrite, BinaryExpr, Predicate}; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::ExecutionProps; diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 5800c73d06..0f4f4da8ce 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -18,8 +18,8 @@ use datafusion_util::AsExpr; use hashbrown::HashSet; use observability_deps::tracing::{debug, trace}; -use predicate::predicate::{BinaryExpr, Predicate, PredicateMatch}; use predicate::rpc_predicate::{InfluxRpcPredicate, FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME}; +use predicate::{BinaryExpr, Predicate, PredicateMatch}; use schema::selection::Selection; use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; @@ -1834,7 +1834,7 @@ impl<'a> ExprRewriter for MissingColumnsToNull<'a> { #[cfg(test)] mod tests { use datafusion::logical_plan::lit; - use predicate::predicate::PredicateBuilder; + use predicate::PredicateBuilder; use schema::builder::SchemaBuilder; use crate::{ diff --git a/query/src/lib.rs b/query/src/lib.rs index 7404bb3e4f..ac37c7d758 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -16,10 +16,7 @@ use data_types::{ use datafusion::physical_plan::SendableRecordBatchStream; use exec::stringset::StringSet; use observability_deps::tracing::{debug, trace}; -use predicate::{ - predicate::{Predicate, PredicateMatch}, - rpc_predicate::QueryDatabaseMeta, -}; +use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch}; use schema::selection::Selection; use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; diff --git a/query/src/provider.rs b/query/src/provider.rs index 966f661cd1..b0c094f3ab 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -18,7 +18,7 @@ use datafusion::{ }, }; use observability_deps::tracing::{debug, trace}; -use predicate::predicate::{Predicate, PredicateBuilder}; +use predicate::{Predicate, PredicateBuilder}; use schema::{merge::SchemaMerger, sort::SortKey, Schema}; use crate::{ diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 5f8f6ae9b6..7025697f16 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -16,7 +16,7 @@ use schema::selection::Selection; use schema::Schema; use crate::QueryChunk; -use predicate::predicate::Predicate; +use predicate::Predicate; use async_trait::async_trait; diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 0bb4d1c9ce..bf4f09003d 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -13,7 +13,7 @@ use datafusion::{ physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, }; use observability_deps::tracing::{debug, trace}; -use predicate::predicate::Predicate; +use predicate::Predicate; use schema::Schema; use crate::{group_by::Aggregate, QueryChunkMeta}; @@ -228,7 +228,7 @@ mod test { use std::{cell::RefCell, sync::Arc}; use datafusion::logical_plan::{col, lit}; - use predicate::predicate::PredicateBuilder; + use predicate::PredicateBuilder; use schema::merge::SchemaMerger; use crate::{test::TestChunk, QueryChunk}; diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index 908c1288f3..ad91ca3a59 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -1,7 +1,7 @@ use arrow::datatypes::DataType; use datafusion::logical_plan::{col, lit}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::fieldlist::{Field, FieldList}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index 01b00e99c1..4eaae5de73 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -13,8 +13,8 @@ use crate::{ }, }; use datafusion::logical_plan::{col, lit}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::frontend::influxrpc::InfluxRpcPlanner; /// runs read_filter(predicate) and compares it to the expected diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 127f300154..bed008d936 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -14,8 +14,8 @@ use datafusion::{ logical_plan::{binary_expr, Operator}, prelude::*, }; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate}; /// runs read_group(predicate) and compares it to the expected diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 47f0c944e2..e7c43ddcfb 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -10,8 +10,8 @@ use async_trait::async_trait; use data_types::{delete_predicate::DeletePredicate, timestamp::TimestampRange}; use datafusion::prelude::*; use db::{test_helpers::write_lp, utils::make_db}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ frontend::influxrpc::InfluxRpcPlanner, group_by::{Aggregate, WindowDuration}, diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index beef62f3e0..2fe5421d0a 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -1,7 +1,7 @@ //! Tests for the Influx gRPC queries use datafusion::logical_plan::{col, lit}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 55c4497882..5dcf1f0007 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -1,6 +1,6 @@ use datafusion::logical_plan::{col, lit}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index e64b596fd1..4f6a46299c 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -1,6 +1,6 @@ use datafusion::logical_plan::{col, lit}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::stringset::{IntoStringSet, StringSetRef}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index e22c9f7a3e..9f36af7c46 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -5,8 +5,8 @@ use db::{ utils::{make_db, TestDb}, }; use metric::{Attributes, Metric, U64Counter}; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::{stringset::StringSet, ExecutionContextProvider}, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, diff --git a/server_benchmarks/benches/read_filter.rs b/server_benchmarks/benches/read_filter.rs index 7cec9b7713..f95dd83e02 100644 --- a/server_benchmarks/benches/read_filter.rs +++ b/server_benchmarks/benches/read_filter.rs @@ -5,8 +5,8 @@ use std::io::Read; // current-thread executor use db::Db; use flate2::read::GzDecoder; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::{Executor, ExecutorType}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/server_benchmarks/benches/read_group.rs b/server_benchmarks/benches/read_group.rs index 4ff62e4570..d606472b36 100644 --- a/server_benchmarks/benches/read_group.rs +++ b/server_benchmarks/benches/read_group.rs @@ -5,8 +5,8 @@ use std::io::Read; // current-thread executor use db::Db; use flate2::read::GzDecoder; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::{Executor, ExecutorType}, frontend::influxrpc::InfluxRpcPlanner, diff --git a/server_benchmarks/benches/tag_values.rs b/server_benchmarks/benches/tag_values.rs index 1028d1e45d..252159a71f 100644 --- a/server_benchmarks/benches/tag_values.rs +++ b/server_benchmarks/benches/tag_values.rs @@ -5,8 +5,8 @@ use std::io::Read; // current-thread executor use db::Db; use flate2::read::GzDecoder; -use predicate::predicate::PredicateBuilder; use predicate::rpc_predicate::InfluxRpcPredicate; +use predicate::PredicateBuilder; use query::{ exec::{Executor, ExecutorType}, frontend::influxrpc::InfluxRpcPlanner,