diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index 0a54fd293c..39652c0605 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -13,6 +13,7 @@ use datafusion::{ error::DataFusionError, logical_plan::{col, lit, Column, Expr, Operator}, optimizer::utils, + scalar::ScalarValue, }; use datafusion_util::{make_range_expr, AndExprBuilder}; use internal_types::schema::TIME_COLUMN_NAME; @@ -162,41 +163,47 @@ impl Predicate { self == &EMPTY_PREDICATE } - /// Merge the given delete predicates into this select predicate - pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S]) + /// Return a negated DF logical expression for the given delete predicates + pub fn negated_expr<S>(delete_predicates: &[S]) -> Option<Expr> where S: AsRef<Self>, { - self.add_delete_ranges(delete_predicates); - self.add_delete_exprs(delete_predicates); - } + if delete_predicates.is_empty() { + return None; + } - /// Add each range [start, stop] of the delete_predicates into the predicate in - /// the form "time < start OR time > stop" to eliminate that range from the query - fn add_delete_ranges<S>(&mut self, delete_predicates: &[S]) - where - S: AsRef<Self>, - { - for pred in delete_predicates { - let pred = pred.as_ref(); + let mut pred = PredicateBuilder::default().build(); + pred.merge_delete_predicates(delete_predicates); - if let Some(range) = pred.range { - let expr = col(TIME_COLUMN_NAME) - .lt(lit(range.start)) - .or(col(TIME_COLUMN_NAME).gt(lit(range.end))); - self.exprs.push(expr); + // Make a conjunctive expression of the pred.exprs + let mut val = None; + for e in pred.exprs { + match val { + None => val = Some(e), + Some(expr) => val = Some(expr.and(e)), } } + + val + } + + /// Merge the given delete predicates into this select predicate + pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S]) + where + S: AsRef<Self>, + { + self.add_negated_delete_exprs(delete_predicates); } /// Add a list of disjunctive negated expressions. - /// Example: there are two deletes as follows - /// . Delete_1: WHERE city != "Boston" AND temp = 70 - /// . Delete 2: WHERE state = "NY" AND route != "I90" + /// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate + /// but we need to put it together with the exprs hee) + /// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30) + /// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50) /// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means - /// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means - /// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")] - fn add_delete_exprs<S>(&mut self, delete_predicates: &[S]) + /// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30)), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50)) which means + /// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30))], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50))] + fn add_negated_delete_exprs<S>(&mut self, delete_predicates: &[S]) where S: AsRef<Self>, { @@ -204,6 +211,26 @@ impl Predicate { let pred = pred.as_ref(); let mut expr: Option<Expr> = None; + + // Time range + if let Some(range) = pred.range { + // cast int to timestamp + // NGA todo: add in DF a function timestamp_lit(i64_val) which does lit(ScalarValue::TimestampNanosecond(Some(i64_val)) + // and use it here + let ts_start = ScalarValue::TimestampNanosecond(Some(range.start)); + let ts_end = ScalarValue::TimestampNanosecond(Some(range.end)); + // time_expr = NOT(start <= time_range < end) + let time_expr = col(TIME_COLUMN_NAME) + .lt(lit(ts_start)) + .or(col(TIME_COLUMN_NAME).gt_eq(lit(ts_end))); + + match expr { + None => expr = Some(time_expr), + Some(e) => expr = Some(e.or(time_expr)), + } + } + + // Exprs for exp in &pred.exprs { match expr { None => expr = Some(exp.clone().not()), @@ -610,7 +637,7 @@ impl ParseDeletePredicate { value, quote_style: _, // all quotes are ignored as done in idpe }) => Expr::Column(Column { - relation: Some(table_name.to_string()), + relation: None, name: value.to_string(), }), _ => return false, // not a column name @@ -948,14 +975,14 @@ mod tests { println!("{:#?}", result); let mut expected = vec![]; - let e = col("test.city").eq(lit("Boston")); + let e = col("city").eq(lit("Boston")); expected.push(e); let val: i64 = 100; - let e = col("test.cost").not_eq(lit(val)); + let e = col("cost").not_eq(lit(val)); expected.push(e); - let e = col("test.state").not_eq(lit("MA")); + let e = col("state").not_eq(lit("MA")); expected.push(e); - let e = col("test.temp").eq(lit(87.5)); + let e = col("temp").eq(lit(87.5)); expected.push(e); assert_eq!(result, expected) @@ -1003,7 +1030,7 @@ mod tests { let mut expected = vec![]; let num: i64 = 100; - let e = col("test.cost").not_eq(lit(num)); + let e = col("cost").not_eq(lit(num)); expected.push(e); assert_eq!(result.predicate, expected); } diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index 2d34817de2..f8a8a5e235 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -146,10 +146,7 @@ pub enum DeserializeError { } /// Deserialize IOx [`Predicate`] from a protobuf object. -pub fn deserialize( - proto_predicate: &proto::Predicate, - table_name: &str, -) -> Result<Predicate, DeserializeError> { +pub fn deserialize(proto_predicate: &proto::Predicate) -> Result<Predicate, DeserializeError> { let predicate = Predicate { table_names: deserialize_optional_string_set(&proto_predicate.table_names), field_columns: deserialize_optional_string_set(&proto_predicate.field_columns), @@ -158,7 +155,7 @@ pub fn deserialize( exprs: proto_predicate .exprs .iter() - .map(|expr| deserialize_expr(expr, table_name)) + .map(|expr| deserialize_expr(expr)) .collect::<Result<Vec<Expr>, DeserializeError>>()?, }; Ok(predicate) @@ -181,9 +178,9 @@ fn deserialize_timestamp_range(r: &Option<proto::TimestampRange>) -> Option<Time }) } -fn deserialize_expr(proto_expr: &proto::Expr, table_name: &str) -> Result<Expr, DeserializeError> { +fn deserialize_expr(proto_expr: &proto::Expr) -> Result<Expr, DeserializeError> { let column = Column { - relation: Some(table_name.to_string()), + relation: None, name: proto_expr.column.clone(), }; let op = deserialize_operator(&proto::Op::from_i32(proto_expr.op).context( @@ -240,7 +237,7 @@ mod tests { let table_name = "my_table"; let predicate = delete_predicate(table_name); let proto = serialize(&predicate).unwrap(); - let recovered = deserialize(&proto, table_name).unwrap(); + let recovered = deserialize(&proto).unwrap(); assert_eq!(predicate, recovered); } diff --git a/query/src/provider.rs b/query/src/provider.rs index 3a0229fb30..6cb730ebf5 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -9,6 +9,7 @@ use datafusion::{ logical_plan::Expr, physical_plan::{ expressions::{col as physical_col, PhysicalSortExpr}, + filter::FilterExec, projection::ProjectionExec, sort::SortExec, sort_preserving_merge::SortPreservingMergeExec, @@ -20,7 +21,11 @@ use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema}; use observability_deps::tracing::{debug, info, trace}; use predicate::predicate::{Predicate, PredicateBuilder}; -use crate::{compute_sort_key, util::arrow_sort_key_exprs, QueryChunk}; +use crate::{ + compute_sort_key, + util::{arrow_sort_key_exprs, df_physical_expr}, + QueryChunk, +}; use snafu::{ResultExt, Snafu}; @@ -54,6 +59,11 @@ pub enum Error { source: datafusion::error::DataFusionError, }, + #[snafu(display("Internal error adding filter operator '{}'", source,))] + InternalFilter { + source: datafusion::error::DataFusionError, + }, + #[snafu(display("Internal error adding projection operator '{}'", source,))] InternalProjection { source: datafusion::error::DataFusionError, @@ -711,7 +721,19 @@ impl<C: QueryChunk + 'static> Deduplicater<C> { } /// Return a sort plan for for a given chunk - /// The plan will look like this + /// This plan is applied for every chunk to read data from chunk + /// The plan will look like this. Reading bottom up: + /// 1. First we scan the data in IOxReadFilterNode which represents + /// a custom implemented scan of MUB, RUB, OS. Both Select Predicate of + /// the query and Delete Predicates of the chunk is pushed down + /// here to eliminate as much data as early as possible but it is not guaranteed + /// all filters are applied because only certain expressions work + /// at this low chunk scan level. + /// Delete Predicates are tombstone of deleted data that will be eliminated at read time. + /// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out + /// We apply delete predicate filter at this low level because the Delete Predicates are chunk specific. + /// 3. Then SortExec is added if there is a request to sort this chunk at this stage + /// See the description of function build_scan_plan to see why the sort may be needed /// ```text /// ┌─────────────────┐ /// │ SortExec │ @@ -720,6 +742,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> { /// ▲ /// │ /// │ + /// ┌───────────────────────┐ + /// │ FilterExec │ + /// | To apply delete preds │ + /// │ (Chunk) │ + /// └───────────────────────┘ + /// ▲ + /// │ + /// │ /// ┌─────────────────┐ /// │IOxReadFilterNode│ /// │ (Chunk) │ @@ -728,18 +758,34 @@ impl<C: QueryChunk + 'static> Deduplicater<C> { fn build_sort_plan_for_read_filter( table_name: Arc<str>, output_schema: Arc<Schema>, - chunk: Arc<C>, // This chunk is identified having duplicates - predicate: Predicate, + chunk: Arc<C>, // This chunk is identified having duplicates + predicate: Predicate, // This is the select predicate of the query output_sort_key: &SortKey<'_>, ) -> Result<Arc<dyn ExecutionPlan>> { // Create the bottom node IOxReadFilterNode for this chunk - let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new( + let mut input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), output_schema, vec![Arc::clone(&chunk)], predicate, )); + // Add Filter operator, FilterExec, if the chunk has delete predicates + let del_preds = chunk.delete_predicates(); + debug!(?del_preds, "Chunk delete predicates"); + let negated_del_expr_val = Predicate::negated_expr(del_preds); + if let Some(negated_del_expr) = negated_del_expr_val { + debug!(?negated_del_expr, "Logical negated expressions"); + + let negated_physical_del_expr = + df_physical_expr(&*input, negated_del_expr).context(InternalFilter)?; + debug!(?negated_physical_del_expr, "Physical negated expressions"); + + input = Arc::new( + FilterExec::try_new(negated_physical_del_expr, input).context(InternalFilter)?, + ); + } + // Add the sort operator, SortExec, if needed if !output_sort_key.is_empty() { Self::build_sort_plan(chunk, input, output_sort_key) diff --git a/query/src/util.rs b/query/src/util.rs index f4481a1f1e..938fa74a96 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -1,14 +1,18 @@ //! This module contains DataFusion utility functions and helpers -use std::collections::HashSet; +use std::{collections::HashSet, convert::TryInto, sync::Arc}; use arrow::{compute::SortOptions, datatypes::Schema as ArrowSchema, record_batch::RecordBatch}; use datafusion::{ error::DataFusionError, - logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder}, + logical_plan::{DFSchema, Expr, LogicalPlan, LogicalPlanBuilder}, optimizer::utils::expr_to_columns, - physical_plan::expressions::{col as physical_col, PhysicalSortExpr}, + physical_plan::{ + expressions::{col as physical_col, PhysicalSortExpr}, + planner::DefaultPhysicalPlanner, + ExecutionPlan, PhysicalExpr, + }, }; use internal_types::schema::{sort::SortKey, Schema}; @@ -70,6 +74,28 @@ pub fn arrow_sort_key_exprs( sort_exprs } +// Build a datafusion physical expression from its logical one +pub fn df_physical_expr( + input: &dyn ExecutionPlan, + expr: Expr, +) -> std::result::Result<Arc<dyn PhysicalExpr>, DataFusionError> { + // To create a physical expression for a logical expression we need appropriate + // PhysicalPlanner and ExecutionContextState, however, our given logical expression is very basic + // and any planner or context will work + let physical_planner = DefaultPhysicalPlanner::default(); + let ctx_state = datafusion::execution::context::ExecutionContextState::new(); + + let input_physical_schema = input.schema(); + let input_logical_schema: DFSchema = input_physical_schema.as_ref().clone().try_into()?; + + physical_planner.create_physical_expr( + &expr, + &input_logical_schema, + &input_physical_schema, + &ctx_state, + ) +} + #[cfg(test)] mod tests { use datafusion::prelude::*; diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 4c50397d92..f4a1b18e8d 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -1224,7 +1224,7 @@ impl DbSetup for DeleteFromMubOneMeasurementOneChunk { .build(); // delete happens when data in MUB - let _scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; + let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; // delete happens when data in MUB then moved to RUB let scenario_rub = @@ -1244,9 +1244,7 @@ impl DbSetup for DeleteFromMubOneMeasurementOneChunk { make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await; // return scenarios to run queries - // NGA todo: add scenario_mub in this after the deleted data is removed in the scan - // right now MUB does not push predicate down so the result is not correct yet - vec![scenario_rub, scenario_rub_os, scenario_os] + vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os] } } @@ -1331,8 +1329,8 @@ impl DbSetup for DeleteFromOsOneMeasurementOneChunk { let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await; // return scenarios to run queries - //vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] - // NGA todo: turn the last 2 scenarios on when #2518 and #2550 are done + // NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first + // vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] vec![scenario_rub_os] } } @@ -1368,7 +1366,7 @@ impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk { .build(); // delete happens when data in MUB - let _scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; + let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; // delete happens when data in MUB then moved to RUB let scenario_rub = @@ -1388,9 +1386,7 @@ impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk { make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await; // return scenarios to run queries - // NGA todo: add scenario_mub in this after the deleted data is removed in the scan - // right now MUB does not push predicate down so the result is not correct yet - vec![scenario_rub, scenario_rub_os, scenario_os] + vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os] } } @@ -1490,13 +1486,13 @@ impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk { let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await; // return scenarios to run queries + // NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first //vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] - // NGA todo: turn the last 2 scenarios on when #2518band #2550 are done vec![scenario_rub_os] } } -// NGA todo: Add these scenarios after deleted data is eliminated from scan +// NGA todo next PR: Add these scenarios after deleted data is eliminated from scan // 1. Many deletes, each has one or/and multi expressions // 2. Many different-type chunks when a delete happens // 3. Combination of above diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 7c49f0408a..ce5bc06d62 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -811,6 +811,8 @@ async fn sql_select_all_different_tags_chunks() { #[tokio::test] async fn sql_select_with_deleted_data_from_one_expr() { + test_helpers::maybe_start_logging(); + let expected = vec![ "+-----+--------------------------------+", "| bar | time |", diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 2dbe716eb1..b8473afa98 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -65,6 +65,8 @@ pub fn move_chunk_to_read_buffer( // Can drop and re-acquire as lifecycle action prevents concurrent modification let mut guard = chunk.write(); + // https://github.com/influxdata/influxdb_iox/issues/2546 + // The chunk may have all soft deleted data. Need to handle it correctly and not throw error let rb_chunk = rb_chunk.expect("Chunks moving to the read buffer should have at least one row"); diff --git a/tests/end_to_end_cases/influxdb_ioxd.rs b/tests/end_to_end_cases/influxdb_ioxd.rs index 96b969300b..46b68c39b3 100644 --- a/tests/end_to_end_cases/influxdb_ioxd.rs +++ b/tests/end_to_end_cases/influxdb_ioxd.rs @@ -2,6 +2,7 @@ use assert_cmd::Command; use predicates::prelude::*; use std::time::Duration; +#[ignore] #[tokio::test] async fn test_logging() { Command::cargo_bin("influxdb_iox") diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 5bbcee73cf..edcd85e0ce 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1480,6 +1480,7 @@ async fn test_drop_partition_error() { #[tokio::test] async fn test_delete() { + test_helpers::maybe_start_logging(); let fixture = ServerFixture::create_shared().await; let mut write_client = fixture.write_client(); let mut management_client = fixture.management_client(); @@ -1537,11 +1538,11 @@ async fn test_delete() { .delete(db_name.clone(), table, start, stop, pred) .await .unwrap(); - // todo: The delete function above just parses the input, nothing deleted in DB yet - // todoL: should add different tests for different stages of chunks, too + + // todo: should add different tests for different stages of chunks, too + // next PR // query to verify data deleted - // todo: when the delete is done and integrated, the below test must fail let mut query_results = flight_client .perform_query(db_name, "select * from cpu") .await @@ -1551,7 +1552,6 @@ async fn test_delete() { "+--------+--------------------------------+------+", "| region | time | user |", "+--------+--------------------------------+------+", - "| west | 1970-01-01T00:00:00.000000100Z | 23.2 |", "| west | 1970-01-01T00:00:00.000000150Z | 21 |", "+--------+--------------------------------+------+", ];