Merge branch 'main' into crepererum/chunk_id

pull/24376/head
kodiakhq[bot] 2021-09-20 13:39:05 +00:00 committed by GitHub
commit 77d84ca5ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 165 additions and 67 deletions

View File

@ -13,6 +13,7 @@ use datafusion::{
error::DataFusionError,
logical_plan::{col, lit, Column, Expr, Operator},
optimizer::utils,
scalar::ScalarValue,
};
use datafusion_util::{make_range_expr, AndExprBuilder};
use internal_types::schema::TIME_COLUMN_NAME;
@ -162,32 +163,47 @@ impl Predicate {
self == &EMPTY_PREDICATE
}
/// Add each range [start, stop] of the delete_predicates into the predicate in
/// the form "time < start OR time > stop" to eliminate that range from the query
pub fn add_delete_ranges<S>(&mut self, delete_predicates: &[S])
/// Return a negated DF logical expression for the given delete predicates
pub fn negated_expr<S>(delete_predicates: &[S]) -> Option<Expr>
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
if delete_predicates.is_empty() {
return None;
}
if let Some(range) = pred.range {
let expr = col(TIME_COLUMN_NAME)
.lt(lit(range.start))
.or(col(TIME_COLUMN_NAME).gt(lit(range.end)));
self.exprs.push(expr);
let mut pred = PredicateBuilder::default().build();
pred.merge_delete_predicates(delete_predicates);
// Make a conjunctive expression of the pred.exprs
let mut val = None;
for e in pred.exprs {
match val {
None => val = Some(e),
Some(expr) => val = Some(expr.and(e)),
}
}
val
}
/// Merge the given delete predicates into this select predicate
pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
self.add_negated_delete_exprs(delete_predicates);
}
/// Add a list of disjunctive negated expressions.
/// Example: there are two deletes as follows
/// . Delete_1: WHERE city != "Boston" AND temp = 70
/// . Delete 2: WHERE state = "NY" AND route != "I90"
/// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate
/// but we need to put it together with the exprs hee)
/// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30)
/// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50)
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
/// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means
/// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")]
pub fn add_delete_exprs<S>(&mut self, delete_predicates: &[S])
/// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30)), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50)) which means
/// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30))], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50))]
fn add_negated_delete_exprs<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
@ -195,6 +211,26 @@ impl Predicate {
let pred = pred.as_ref();
let mut expr: Option<Expr> = None;
// Time range
if let Some(range) = pred.range {
// cast int to timestamp
// NGA todo: add in DF a function timestamp_lit(i64_val) which does lit(ScalarValue::TimestampNanosecond(Some(i64_val))
// and use it here
let ts_start = ScalarValue::TimestampNanosecond(Some(range.start));
let ts_end = ScalarValue::TimestampNanosecond(Some(range.end));
// time_expr = NOT(start <= time_range < end)
let time_expr = col(TIME_COLUMN_NAME)
.lt(lit(ts_start))
.or(col(TIME_COLUMN_NAME).gt_eq(lit(ts_end)));
match expr {
None => expr = Some(time_expr),
Some(e) => expr = Some(e.or(time_expr)),
}
}
// Exprs
for exp in &pred.exprs {
match expr {
None => expr = Some(exp.clone().not()),
@ -601,7 +637,7 @@ impl ParseDeletePredicate {
value,
quote_style: _, // all quotes are ignored as done in idpe
}) => Expr::Column(Column {
relation: Some(table_name.to_string()),
relation: None,
name: value.to_string(),
}),
_ => return false, // not a column name
@ -939,14 +975,14 @@ mod tests {
println!("{:#?}", result);
let mut expected = vec![];
let e = col("test.city").eq(lit("Boston"));
let e = col("city").eq(lit("Boston"));
expected.push(e);
let val: i64 = 100;
let e = col("test.cost").not_eq(lit(val));
let e = col("cost").not_eq(lit(val));
expected.push(e);
let e = col("test.state").not_eq(lit("MA"));
let e = col("state").not_eq(lit("MA"));
expected.push(e);
let e = col("test.temp").eq(lit(87.5));
let e = col("temp").eq(lit(87.5));
expected.push(e);
assert_eq!(result, expected)
@ -994,7 +1030,7 @@ mod tests {
let mut expected = vec![];
let num: i64 = 100;
let e = col("test.cost").not_eq(lit(num));
let e = col("cost").not_eq(lit(num));
expected.push(e);
assert_eq!(result.predicate, expected);
}

View File

@ -146,10 +146,7 @@ pub enum DeserializeError {
}
/// Deserialize IOx [`Predicate`] from a protobuf object.
pub fn deserialize(
proto_predicate: &proto::Predicate,
table_name: &str,
) -> Result<Predicate, DeserializeError> {
pub fn deserialize(proto_predicate: &proto::Predicate) -> Result<Predicate, DeserializeError> {
let predicate = Predicate {
table_names: deserialize_optional_string_set(&proto_predicate.table_names),
field_columns: deserialize_optional_string_set(&proto_predicate.field_columns),
@ -158,7 +155,7 @@ pub fn deserialize(
exprs: proto_predicate
.exprs
.iter()
.map(|expr| deserialize_expr(expr, table_name))
.map(|expr| deserialize_expr(expr))
.collect::<Result<Vec<Expr>, DeserializeError>>()?,
};
Ok(predicate)
@ -181,9 +178,9 @@ fn deserialize_timestamp_range(r: &Option<proto::TimestampRange>) -> Option<Time
})
}
fn deserialize_expr(proto_expr: &proto::Expr, table_name: &str) -> Result<Expr, DeserializeError> {
fn deserialize_expr(proto_expr: &proto::Expr) -> Result<Expr, DeserializeError> {
let column = Column {
relation: Some(table_name.to_string()),
relation: None,
name: proto_expr.column.clone(),
};
let op = deserialize_operator(&proto::Op::from_i32(proto_expr.op).context(
@ -240,7 +237,7 @@ mod tests {
let table_name = "my_table";
let predicate = delete_predicate(table_name);
let proto = serialize(&predicate).unwrap();
let recovered = deserialize(&proto, table_name).unwrap();
let recovered = deserialize(&proto).unwrap();
assert_eq!(predicate, recovered);
}

View File

@ -9,6 +9,7 @@ use datafusion::{
logical_plan::Expr,
physical_plan::{
expressions::{col as physical_col, PhysicalSortExpr},
filter::FilterExec,
projection::ProjectionExec,
sort::SortExec,
sort_preserving_merge::SortPreservingMergeExec,
@ -20,7 +21,11 @@ use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
use observability_deps::tracing::{debug, info, trace};
use predicate::predicate::{Predicate, PredicateBuilder};
use crate::{compute_sort_key, util::arrow_sort_key_exprs, QueryChunk};
use crate::{
compute_sort_key,
util::{arrow_sort_key_exprs, df_physical_expr},
QueryChunk,
};
use snafu::{ResultExt, Snafu};
@ -54,6 +59,11 @@ pub enum Error {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error adding filter operator '{}'", source,))]
InternalFilter {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error adding projection operator '{}'", source,))]
InternalProjection {
source: datafusion::error::DataFusionError,
@ -711,7 +721,19 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
}
/// Return a sort plan for for a given chunk
/// The plan will look like this
/// This plan is applied for every chunk to read data from chunk
/// The plan will look like this. Reading bottom up:
/// 1. First we scan the data in IOxReadFilterNode which represents
/// a custom implemented scan of MUB, RUB, OS. Both Select Predicate of
/// the query and Delete Predicates of the chunk is pushed down
/// here to eliminate as much data as early as possible but it is not guaranteed
/// all filters are applied because only certain expressions work
/// at this low chunk scan level.
/// Delete Predicates are tombstone of deleted data that will be eliminated at read time.
/// 2. If the chunk has Delete Predicates, the FilterExec will be added to filter data out
/// We apply delete predicate filter at this low level because the Delete Predicates are chunk specific.
/// 3. Then SortExec is added if there is a request to sort this chunk at this stage
/// See the description of function build_scan_plan to see why the sort may be needed
/// ```text
/// ┌─────────────────┐
/// │ SortExec │
@ -720,6 +742,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// ▲
/// │
/// │
/// ┌───────────────────────┐
/// │ FilterExec │
/// | To apply delete preds │
/// │ (Chunk) │
/// └───────────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
@ -729,17 +759,33 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
predicate: Predicate, // This is the select predicate of the query
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the bottom node IOxReadFilterNode for this chunk
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
let mut input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
output_schema,
vec![Arc::clone(&chunk)],
predicate,
));
// Add Filter operator, FilterExec, if the chunk has delete predicates
let del_preds = chunk.delete_predicates();
debug!(?del_preds, "Chunk delete predicates");
let negated_del_expr_val = Predicate::negated_expr(del_preds);
if let Some(negated_del_expr) = negated_del_expr_val {
debug!(?negated_del_expr, "Logical negated expressions");
let negated_physical_del_expr =
df_physical_expr(&*input, negated_del_expr).context(InternalFilter)?;
debug!(?negated_physical_del_expr, "Physical negated expressions");
input = Arc::new(
FilterExec::try_new(negated_physical_del_expr, input).context(InternalFilter)?,
);
}
// Add the sort operator, SortExec, if needed
if !output_sort_key.is_empty() {
Self::build_sort_plan(chunk, input, output_sort_key)

View File

@ -1,14 +1,18 @@
//! This module contains DataFusion utility functions and helpers
use std::collections::HashSet;
use std::{collections::HashSet, convert::TryInto, sync::Arc};
use arrow::{compute::SortOptions, datatypes::Schema as ArrowSchema, record_batch::RecordBatch};
use datafusion::{
error::DataFusionError,
logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder},
logical_plan::{DFSchema, Expr, LogicalPlan, LogicalPlanBuilder},
optimizer::utils::expr_to_columns,
physical_plan::expressions::{col as physical_col, PhysicalSortExpr},
physical_plan::{
expressions::{col as physical_col, PhysicalSortExpr},
planner::DefaultPhysicalPlanner,
ExecutionPlan, PhysicalExpr,
},
};
use internal_types::schema::{sort::SortKey, Schema};
@ -70,6 +74,28 @@ pub fn arrow_sort_key_exprs(
sort_exprs
}
// Build a datafusion physical expression from its logical one
pub fn df_physical_expr(
input: &dyn ExecutionPlan,
expr: Expr,
) -> std::result::Result<Arc<dyn PhysicalExpr>, DataFusionError> {
// To create a physical expression for a logical expression we need appropriate
// PhysicalPlanner and ExecutionContextState, however, our given logical expression is very basic
// and any planner or context will work
let physical_planner = DefaultPhysicalPlanner::default();
let ctx_state = datafusion::execution::context::ExecutionContextState::new();
let input_physical_schema = input.schema();
let input_logical_schema: DFSchema = input_physical_schema.as_ref().clone().try_into()?;
physical_planner.create_physical_expr(
&expr,
&input_logical_schema,
&input_physical_schema,
&ctx_state,
)
}
#[cfg(test)]
mod tests {
use datafusion::prelude::*;

View File

@ -1237,7 +1237,7 @@ impl DbSetup for DeleteFromMubOneMeasurementOneChunk {
.build();
// delete happens when data in MUB
let _scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
// delete happens when data in MUB then moved to RUB
let scenario_rub =
@ -1257,9 +1257,7 @@ impl DbSetup for DeleteFromMubOneMeasurementOneChunk {
make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
// NGA todo: add scenario_mub in this after the deleted data is removed in the scan
// right now MUB does not push predicate down so the result is not correct yet
vec![scenario_rub, scenario_rub_os, scenario_os]
vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os]
}
}
@ -1344,8 +1342,8 @@ impl DbSetup for DeleteFromOsOneMeasurementOneChunk {
let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
//vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os]
// NGA todo: turn the last 2 scenarios on when #2518 and #2550 are done
// NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first
// vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os]
vec![scenario_rub_os]
}
}
@ -1381,7 +1379,7 @@ impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk {
.build();
// delete happens when data in MUB
let _scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
// delete happens when data in MUB then moved to RUB
let scenario_rub =
@ -1401,9 +1399,7 @@ impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk {
make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
// NGA todo: add scenario_mub in this after the deleted data is removed in the scan
// right now MUB does not push predicate down so the result is not correct yet
vec![scenario_rub, scenario_rub_os, scenario_os]
vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os]
}
}
@ -1503,13 +1499,13 @@ impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk {
let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
// NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first
//vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os]
// NGA todo: turn the last 2 scenarios on when #2518band #2550 are done
vec![scenario_rub_os]
}
}
// NGA todo: Add these scenarios after deleted data is eliminated from scan
// NGA todo next PR: Add these scenarios after deleted data is eliminated from scan
// 1. Many deletes, each has one or/and multi expressions
// 2. Many different-type chunks when a delete happens
// 3. Combination of above

View File

@ -811,6 +811,8 @@ async fn sql_select_all_different_tags_chunks() {
#[tokio::test]
async fn sql_select_with_deleted_data_from_one_expr() {
test_helpers::maybe_start_logging();
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",

View File

@ -344,19 +344,11 @@ impl QueryChunk for DbChunk {
debug!(?delete_predicates, "Input Delete Predicates to read_filter");
// add negated deleted ranges to the predicate
let mut pred_with_deleted_ranges = predicate.clone();
pred_with_deleted_ranges.add_delete_ranges(delete_predicates);
// merge the negated delete predicates into the select predicate
let mut pred_with_deleted_exprs = predicate.clone();
pred_with_deleted_exprs.merge_delete_predicates(delete_predicates);
debug!(
?pred_with_deleted_ranges,
"Input Predicate plus deleted ranges"
);
// add negated deleted predicates
let mut pred_wth_deleted_exprs = pred_with_deleted_ranges.clone();
pred_wth_deleted_exprs.add_delete_exprs(delete_predicates);
debug!(
?pred_wth_deleted_exprs,
?pred_with_deleted_exprs,
"Input Predicate plus deleted ranges and deleted predicates"
);
@ -396,7 +388,7 @@ impl QueryChunk for DbChunk {
)))
}
State::ParquetFile { chunk, .. } => chunk
.read_filter(&pred_wth_deleted_exprs, selection)
.read_filter(&pred_with_deleted_exprs, selection)
.context(ParquetFileChunkError {
chunk_id: self.id(),
}),

View File

@ -65,6 +65,8 @@ pub fn move_chunk_to_read_buffer(
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
// https://github.com/influxdata/influxdb_iox/issues/2546
// The chunk may have all soft deleted data. Need to handle it correctly and not throw error
let rb_chunk =
rb_chunk.expect("Chunks moving to the read buffer should have at least one row");

View File

@ -2,6 +2,7 @@ use assert_cmd::Command;
use predicates::prelude::*;
use std::time::Duration;
#[ignore]
#[tokio::test]
async fn test_logging() {
Command::cargo_bin("influxdb_iox")

View File

@ -1480,6 +1480,7 @@ async fn test_drop_partition_error() {
#[tokio::test]
async fn test_delete() {
test_helpers::maybe_start_logging();
let fixture = ServerFixture::create_shared().await;
let mut write_client = fixture.write_client();
let mut management_client = fixture.management_client();
@ -1537,11 +1538,11 @@ async fn test_delete() {
.delete(db_name.clone(), table, start, stop, pred)
.await
.unwrap();
// todo: The delete function above just parses the input, nothing deleted in DB yet
// todoL: should add different tests for different stages of chunks, too
// todo: should add different tests for different stages of chunks, too
// next PR
// query to verify data deleted
// todo: when the delete is done and integrated, the below test must fail
let mut query_results = flight_client
.perform_query(db_name, "select * from cpu")
.await
@ -1551,7 +1552,6 @@ async fn test_delete() {
"+--------+--------------------------------+------+",
"| region | time | user |",
"+--------+--------------------------------+------+",
"| west | 1970-01-01T00:00:00.000000100Z | 23.2 |",
"| west | 1970-01-01T00:00:00.000000150Z | 21 |",
"+--------+--------------------------------+------+",
];