diff --git a/parquet_file/src/catalog/cleanup.rs b/parquet_file/src/catalog/cleanup.rs index 0f48ad87b0..7725ddeaa9 100644 --- a/parquet_file/src/catalog/cleanup.rs +++ b/parquet_file/src/catalog/cleanup.rs @@ -6,7 +6,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath}; use object_store::{ObjectStore, ObjectStoreApi}; use observability_deps::tracing::info; use parking_lot::Mutex; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use snafu::{ResultExt, Snafu}; use crate::catalog::{ @@ -148,7 +148,7 @@ impl CatalogState for TracerCatalogState { fn delete_predicate( &mut self, - _predicate: Arc, + _predicate: Arc, _chunks: Vec, ) { // No need to track delete predicates, because the cleanup's job is to remove unreferenced parquet files. Delete diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index 0e07bec9a9..62b6eb66ab 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -20,7 +20,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath}; use object_store::{ObjectStore, ObjectStoreApi}; use observability_deps::tracing::{info, warn}; use parking_lot::RwLock; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ collections::{ @@ -147,11 +147,6 @@ pub enum Error { source: crate::catalog::interface::CatalogStateRemoveError, }, - #[snafu(display("Cannot serialize predicate: {}", source))] - CannotSerializePredicate { - source: predicate::serialize::SerializeError, - }, - #[snafu(display("Delete predicate missing"))] DeletePredicateMissing, @@ -860,7 +855,7 @@ impl<'c> TransactionHandle<'c> { /// Register new delete predicate. pub fn delete_predicate( &mut self, - predicate: &Predicate, + predicate: &DeletePredicate, chunks: &[ChunkAddrWithoutDatabase], ) -> Result<()> { self.transaction @@ -868,10 +863,7 @@ impl<'c> TransactionHandle<'c> { .expect("transaction handle w/o transaction?!") .record_action(proto::transaction::action::Action::DeletePredicate( proto::DeletePredicate { - predicate: Some( - predicate::serialize::serialize(predicate) - .context(CannotSerializePredicate)?, - ), + predicate: Some(predicate::serialize::serialize(predicate)), chunks: chunks .iter() .map(|chunk| proto::ChunkAddr { @@ -992,16 +984,13 @@ impl<'c> CheckpointHandle<'c> { } fn create_actions_for_delete_predicates( - delete_predicates: Vec<(Arc, Vec)>, + delete_predicates: Vec<(Arc, Vec)>, ) -> Result, Error> { delete_predicates .into_iter() .map(|(predicate, chunks)| { let action = proto::DeletePredicate { - predicate: Some( - predicate::serialize::serialize(&predicate) - .context(CannotSerializePredicate)?, - ), + predicate: Some(predicate::serialize::serialize(&predicate)), chunks: chunks .iter() .map(|chunk| proto::ChunkAddr { diff --git a/parquet_file/src/catalog/interface.rs b/parquet_file/src/catalog/interface.rs index 8fe84b52b3..8570a1ba8b 100644 --- a/parquet_file/src/catalog/interface.rs +++ b/parquet_file/src/catalog/interface.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use data_types::chunk_metadata::{ChunkAddr, ChunkId}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use snafu::Snafu; use crate::metadata::IoxParquetMetaData; @@ -118,7 +118,7 @@ pub trait CatalogState { /// The delete predicate will only be applied to the given chunks (by table name, partition key, and chunk ID). fn delete_predicate( &mut self, - predicate: Arc, + predicate: Arc, chunks: Vec, ); } @@ -141,6 +141,6 @@ pub struct CheckpointData { /// This must only contains chunks that are still present in the catalog. Predicates that do not have any chunks /// attached should be left out. /// - /// The vector itself must be sorted by [`Predicate`]. The chunks list must also be sorted. - pub delete_predicates: Vec<(Arc, Vec)>, + /// The vector itself must be sorted by [`DeletePredicate`]. The chunks list must also be sorted. + pub delete_predicates: Vec<(Arc, Vec)>, } diff --git a/parquet_file/src/catalog/test_helpers.rs b/parquet_file/src/catalog/test_helpers.rs index aff7289528..6d19b8e351 100644 --- a/parquet_file/src/catalog/test_helpers.rs +++ b/parquet_file/src/catalog/test_helpers.rs @@ -7,9 +7,12 @@ use std::{ sync::Arc, }; -use data_types::chunk_metadata::ChunkId; +use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange}; use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath}; -use predicate::predicate::Predicate; +use predicate::{ + delete_expr::{DeleteExpr, Op, Scalar}, + delete_predicate::DeletePredicate, +}; use snafu::ResultExt; use crate::{ @@ -41,7 +44,7 @@ pub struct Partition { #[derive(Clone, Debug)] pub struct Chunk { pub parquet_info: CatalogParquetInfo, - pub delete_predicates: Vec>, + pub delete_predicates: Vec>, } /// In-memory catalog state, for testing. @@ -74,16 +77,16 @@ impl TestCatalogState { } /// Return an iterator over all predicates in this catalog. - pub fn delete_predicates(&self) -> Vec<(Arc, Vec)> { - let mut predicates: HashMap, Vec)> = + pub fn delete_predicates(&self) -> Vec<(Arc, Vec)> { + let mut predicates: HashMap, Vec)> = Default::default(); for (table_name, table) in &self.tables { for (partition_key, partition) in &table.partitions { for (chunk_id, chunk) in &partition.chunks { for predicate in &chunk.delete_predicates { - let predicate_ref: &Predicate = predicate.as_ref(); - let addr = (predicate_ref as *const Predicate) as usize; + let predicate_ref: &DeletePredicate = predicate.as_ref(); + let addr = (predicate_ref as *const DeletePredicate) as usize; let pred_chunk_closure = || ChunkAddrWithoutDatabase { table_name: Arc::clone(table_name), partition_key: Arc::clone(partition_key), @@ -197,7 +200,7 @@ impl CatalogState for TestCatalogState { fn delete_predicate( &mut self, - predicate: Arc, + predicate: Arc, chunks: Vec, ) { for addr in chunks { @@ -256,7 +259,8 @@ where // The expected state of the catalog let mut expected_files: HashMap)> = HashMap::new(); - let mut expected_predicates: Vec<(Arc, Vec)> = vec![]; + let mut expected_predicates: Vec<(Arc, Vec)> = + vec![]; assert_checkpoint(&state, &f, &expected_files, &expected_predicates); // add files @@ -591,7 +595,7 @@ fn assert_checkpoint( state: &S, f: &F, expected_files: &HashMap)>, - expected_predicates: &[(Arc, Vec)], + expected_predicates: &[(Arc, Vec)], ) where F: Fn(&S) -> CheckpointData, { @@ -634,28 +638,20 @@ fn get_sorted_keys<'a>( } /// Helper to create a simple delete predicate. -pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc { - use datafusion::{ - logical_plan::{Column, Expr, Operator}, - scalar::ScalarValue, - }; - - Arc::new(Predicate { +pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc { + Arc::new(DeletePredicate { table_names: Some( IntoIterator::into_iter([table_name.to_string(), format!("not_{}", table_name)]) .collect(), ), field_columns: None, partition_key: None, - range: None, - exprs: vec![Expr::BinaryExpr { - left: Box::new(Expr::Column(Column { - relation: None, - name: "foo".to_string(), - })), - op: Operator::Eq, - right: Box::new(Expr::Literal(ScalarValue::Int64(Some(value)))), - }], + range: TimestampRange { start: 11, end: 22 }, + exprs: vec![DeleteExpr::new( + "foo".to_string(), + Op::Eq, + Scalar::I64(value), + )], }) } diff --git a/predicate/src/delete_expr.rs b/predicate/src/delete_expr.rs index deaca23547..d6b5bf9e89 100644 --- a/predicate/src/delete_expr.rs +++ b/predicate/src/delete_expr.rs @@ -23,6 +23,10 @@ pub struct DeleteExpr { } impl DeleteExpr { + pub fn new(column: String, op: Op, scalar: Scalar) -> Self { + Self { column, op, scalar } + } + /// Column (w/o table name). pub fn column(&self) -> &str { &self.column diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs new file mode 100644 index 0000000000..02855368ff --- /dev/null +++ b/predicate/src/delete_predicate.rs @@ -0,0 +1,736 @@ +use std::{collections::BTreeSet, convert::TryInto}; + +use chrono::DateTime; +use data_types::timestamp::TimestampRange; +use datafusion::logical_plan::{lit, Column, Expr, Operator}; +use snafu::{ResultExt, Snafu}; +use sqlparser::{ + ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, + dialect::GenericDialect, + parser::Parser, +}; + +use crate::delete_expr::DeleteExpr; + +const FLUX_TABLE: &str = "_measurement"; + +/// Parse Delete Predicates +/// Parse Error +#[derive(Debug, Snafu)] +pub enum Error { + /// Invalid time format + #[snafu(display("Invalid timestamp: {}", value))] + InvalidTimestamp { value: String }, + + /// Invalid time range + #[snafu(display("Invalid time range: ({}, {})", start, stop))] + InvalidTimeRange { start: String, stop: String }, + + /// Predicate syntax error + #[snafu(display("Invalid predicate syntax: ({})", value))] + InvalidSyntax { value: String }, + + /// Predicate semantics error + #[snafu(display("Invalid predicate semantics: ({})", value))] + InvalidSemantics { value: String }, + + /// Predicate include non supported expression + #[snafu(display("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_name != literal': ({})", value))] + NotSupportPredicate { value: String }, + + #[snafu(display(r#"Unable to parse delete string '{}'"#, value))] + DeleteInvalid { + source: serde_json::Error, + value: String, + }, + + #[snafu(display( + r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#, + value + ))] + DeleteKeywordInvalid { value: String }, + + #[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))] + DeleteValueInvalid { value: String }, + + #[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))] + DeleteObjectInvalid { value: String }, + + #[snafu(display(r#"Invalid table name in delete '{}'"#, value))] + DeleteTableInvalid { value: String }, + + #[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))] + DeleteStartStopInvalid { value: String }, +} + +/// Result type for Parser Cient +pub type Result = std::result::Result; + +/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx +/// query engine. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct DeletePredicate { + /// Optional table restriction. If present, restricts the results + /// to only tables whose names are in `table_names` + pub table_names: Option>, + + /// Optional field restriction. If present, restricts the results to only + /// tables which have *at least one* of the fields in field_columns. + pub field_columns: Option>, + + /// Optional partition key filter + pub partition_key: Option, + + /// Only rows within this range are included in + /// results. Other rows are excluded. + pub range: TimestampRange, + + /// Optional arbitrary predicates, represented as list of + /// expressions applied a logical conjunction (aka they + /// are 'AND'ed together). Only rows that evaluate to TRUE for all + /// these expressions should be returned. Other rows are excluded + /// from the results. + pub exprs: Vec, +} + +impl From for crate::predicate::Predicate { + fn from(pred: DeletePredicate) -> Self { + Self { + table_names: pred.table_names, + field_columns: pred.field_columns, + partition_key: pred.partition_key, + range: Some(pred.range), + exprs: pred.exprs.into_iter().map(|expr| expr.into()).collect(), + } + } +} + +/// Parser for Delete predicate and time range +#[derive(Debug, Clone)] +pub struct ParseDeletePredicate { + pub start_time: i64, + pub stop_time: i64, + // conjunctive predicate of binary expressions of = or != + pub predicate: Vec, +} + +impl ParseDeletePredicate { + /// Create a ParseDeletePredicate + pub fn new(start_time: i64, stop_time: i64, predicate: Vec) -> Self { + Self { + start_time, + stop_time, + predicate, + } + } + + /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server + pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result { + // parse and check time range + let (start_time, stop_time) = Self::parse_time_range(start, stop)?; + + // Parse the predicate + let delete_exprs = Self::parse_predicate(predicate)?; + + Ok(Self::new(start_time, stop_time, delete_exprs)) + } + + /// Parse the predicate and convert it into datafusion expression + /// A delete predicate is a conjunctive expression of many + /// binary expressions of 'colum = constant' or 'column != constant' + /// + pub fn parse_predicate(predicate: &str) -> Result> { + if predicate.is_empty() { + return Ok(vec![]); + } + + // "DELETE FROM table_name WHERE predicate" + // Table name can be anything to have sqlparser work on the right sql syntax + let mut sql = "DELETE FROM table_name WHERE ".to_string(); + sql.push_str(predicate); + + // parse the delete sql + let dialect = GenericDialect {}; + let ast = Parser::parse_sql(&dialect, sql.as_str()); + match ast { + Err(parse_err) => { + let error_str = format!("{}, {}", predicate, parse_err); + Err(Error::InvalidSyntax { value: error_str }) + } + Ok(mut stmt) => { + if stmt.len() != 1 { + return Err(Error::InvalidSemantics { + value: predicate.to_string(), + }); + } + + let stmt = stmt.pop(); + match stmt { + Some(Statement::Delete { + table_name: _, + selection: Some(expr), + .. + }) => { + // split this expr into smaller binary if any + let mut exprs = vec![]; + let split = Self::split_members(&expr, &mut exprs); + if !split { + return Err(Error::NotSupportPredicate { + value: predicate.to_string(), + }); + } + Ok(exprs) + } + _ => Err(Error::InvalidSemantics { + value: predicate.to_string(), + }), + } + } + } + } + + pub fn build_delete_predicate( + start_time: String, + stop_time: String, + predicate: String, + ) -> Result { + // parse time range and the predicate + let parse_delete_pred = ParseDeletePredicate::try_new( + start_time.as_str(), + stop_time.as_str(), + predicate.as_str(), + )?; + + Ok(parse_delete_pred.into()) + } + + /// Recursively split all "AND" expressions into smaller ones + /// Example: "A AND B AND C" => [A, B, C] + /// Return false if not all of them are AND of binary expression of + /// "column_name = literal" or "column_name != literal" + /// + /// The split expressions will be converted into data fusion expressions + pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec) -> bool { + // The below code built to be compatible with + // https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go + match predicate { + SqlParserExpr::BinaryOp { + left, + op: BinaryOperator::And, + right, + } => { + if !Self::split_members(left, predicates) { + return false; + } + if !Self::split_members(right, predicates) { + return false; + } + } + SqlParserExpr::BinaryOp { left, op, right } => { + // Verify Operator + let op = match op { + BinaryOperator::Eq => Operator::Eq, + BinaryOperator::NotEq => Operator::NotEq, + _ => return false, + }; + + // verify if left is identifier (column name) + let column = match &**left { + SqlParserExpr::Identifier(Ident { + value, + quote_style: _, // all quotes are ignored as done in idpe + }) => Expr::Column(Column { + relation: None, + name: value.to_string(), + }), + _ => return false, // not a column name + }; + + // verify if right is a literal or an identifier (e.g column name) + let value = match &**right { + SqlParserExpr::Identifier(Ident { + value, + quote_style: _, + }) => lit(value.to_string()), + SqlParserExpr::Value(Value::DoubleQuotedString(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::SingleQuotedString(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::NationalStringLiteral(value)) => { + lit(value.to_string()) + } + SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()), + SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::() { + Ok(v) => lit(v), + Err(_) => lit(v.parse::().unwrap()), + }, + SqlParserExpr::Value(Value::Boolean(v)) => lit(*v), + _ => return false, // not a literal + }; + + let expr = Expr::BinaryExpr { + left: Box::new(column), + op, + right: Box::new(value), + }; + let expr: Result = expr.try_into(); + match expr { + Ok(expr) => { + predicates.push(expr); + } + Err(_) => { + // cannot convert + return false; + } + } + } + _ => return false, + } + + true + } + + /// Parse a time and return its time in nanosecond + pub fn parse_time(input: &str) -> Result { + // This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z + // See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame + let datetime_result = DateTime::parse_from_rfc3339(input); + match datetime_result { + Ok(datetime) => Ok(datetime.timestamp_nanos()), + Err(timestamp_err) => { + // See if it is in nanosecond form + let time_result = input.parse::(); + match time_result { + Ok(nano) => Ok(nano), + Err(nano_err) => { + // wrong format, return both error + let error_str = format!("{}, {}", timestamp_err, nano_err); + Err(Error::InvalidTimestamp { value: error_str }) + } + } + } + } + } + + /// Parse a time range [start, stop] + pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> { + let start_time = Self::parse_time(start)?; + let stop_time = Self::parse_time(stop)?; + if start_time > stop_time { + return Err(Error::InvalidTimeRange { + start: start.to_string(), + stop: stop.to_string(), + }); + } + + Ok((start_time, stop_time)) + } +} + +impl From for DeletePredicate { + fn from(pred: ParseDeletePredicate) -> Self { + Self { + table_names: None, + field_columns: None, + partition_key: None, + range: TimestampRange { + start: pred.start_time, + end: pred.stop_time, + }, + exprs: pred.predicate, + } + } +} + +// Note that this struct and its functions are used to parse FLUX DELETE, +// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before +// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is +// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the +// sql-format predicates and timestamps +#[derive(Debug, Default, PartialEq, Clone)] +/// data of a parsed delete +pub struct ParsedDelete { + /// Empty string, "", if no table specified + pub table_name: String, + pub start_time: String, + pub stop_time: String, + pub predicate: String, +} + +/// Return parsed data of an influx delete: +/// A few input examples and their parsed results: +/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"} +/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\""" +/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"} +/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50" +pub fn parse_delete(input: &str) -> Result { + let parsed_obj: serde_json::Value = + serde_json::from_str(input).context(DeleteInvalid { value: input })?; + let mut parsed_delete = ParsedDelete::default(); + + if let serde_json::Value::Object(items) = parsed_obj { + for item in items { + // The value must be type String + if let Some(val) = item.1.as_str() { + match item.0.to_lowercase().as_str() { + "start" => parsed_delete.start_time = val.to_string(), + "stop" => parsed_delete.stop_time = val.to_string(), + "predicate" => parsed_delete.predicate = val.to_string(), + _ => { + return Err(Error::DeleteKeywordInvalid { + value: input.to_string(), + }) + } + } + } else { + return Err(Error::DeleteValueInvalid { + value: input.to_string(), + }); + } + } + } else { + return Err(Error::DeleteObjectInvalid { + value: input.to_string(), + }); + } + + // Start or stop is empty + if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() { + return Err(Error::DeleteStartStopInvalid { + value: input.to_string(), + }); + } + + // Extract table from the predicate if any + if parsed_delete.predicate.contains(FLUX_TABLE) { + // since predicate is a conjunctive expression, split them by "and" + let predicate = parsed_delete + .predicate + .replace(" AND ", " and ") + .replace(" ANd ", " and ") + .replace(" And ", " and ") + .replace(" AnD ", " and "); + + let split: Vec<&str> = predicate.split("and").collect(); + + let mut predicate_no_table = "".to_string(); + for s in split { + if s.contains(FLUX_TABLE) { + // This should be in form "_measurement = " + // only by replacing the rest with "" + let table_name = s + .replace(FLUX_TABLE, "") + .replace("=", "") + .trim() + .to_string(); + // Do not support white spaces in table name + if table_name.contains(' ') { + return Err(Error::DeleteTableInvalid { + value: input.to_string(), + }); + } + parsed_delete.table_name = table_name; + } else { + // This is a normal column comparison, put it back to send to sqlparser later + if !predicate_no_table.is_empty() { + predicate_no_table.push_str(" and ") + } + predicate_no_table.push_str(s.trim()); + } + } + parsed_delete.predicate = predicate_no_table; + } + + Ok(parsed_delete) +} + +#[cfg(test)] +mod tests { + use crate::delete_expr::{Op, Scalar}; + + use super::*; + + #[test] + fn test_time_range_valid() { + let start = r#"100"#; + let stop = r#"100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (100, 100); + assert_eq!(result, expected); + + let start = r#"100"#; + let stop = r#"200"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (100, 200); + assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + let stop = r#"1970-01-01T00:00:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 0); + assert_eq!(result, expected); + + // let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + // let stop = r#"now()"#; // -- Not working. Need to find a way to test this + // let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + // let expected = (0, 0); + // assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; + let stop = r#"100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 100); + assert_eq!(result, expected); + + let start = r#"1970-01-01T00:00:00Z"#; + let stop = r#"1970-01-01T00:01:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); + let expected = (0, 60000000000); + assert_eq!(result, expected); + } + + #[test] + fn test_time_range_invalid() { + let start = r#"100"#; + let stop = r#"-100"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"100"#; + let stop = r#"50"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"100"#; + let stop = r#"1970-01-01T00:00:00Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + + let start = r#"1971-09-01T00:00:10Z"#; + let stop = r#"1971-09-01T00:00:05Z"#; + let result = ParseDeletePredicate::parse_time_range(start, stop); + assert!(result.is_err()); + } + + #[test] + fn test_parse_timestamp() { + let input = r#"123"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 123); + + // must parse time + let input = r#"1970-01-01T00:00:00Z"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 0); + + let input = r#"1971-02-01T15:30:21Z"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, 34270221000000000); + } + + #[test] + fn test_parse_timestamp_negative() { + let input = r#"-123"#; + let time = ParseDeletePredicate::parse_time(input).unwrap(); + assert_eq!(time, -123); + } + + #[test] + fn test_parse_timestamp_invalid() { + let input = r#"123gdb"#; + ParseDeletePredicate::parse_time(input).unwrap_err(); + + let input = r#"1970-01-01T00:00:00"#; + ParseDeletePredicate::parse_time(input).unwrap_err(); + + // It turn out this is not invalid but return1 1971 + let input = r#"1971-02-01:30:21Z"#; + ParseDeletePredicate::parse_time(input).unwrap_err(); + } + + #[test] + fn test_parse_timestamp_out_of_range() { + let input = r#"99999999999999999999999999999999"#; + let time = ParseDeletePredicate::parse_time(input); + assert!(time.is_err()); + } + + #[test] + fn test_parse_predicate() { + let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#; + let result = ParseDeletePredicate::parse_predicate(pred).unwrap(); + + println!("{:#?}", result); + + let expected = vec![ + DeleteExpr::new( + "city".to_string(), + Op::Eq, + Scalar::String("Boston".to_string()), + ), + DeleteExpr::new("cost".to_string(), Op::Ne, Scalar::I64(100)), + DeleteExpr::new( + "state".to_string(), + Op::Ne, + Scalar::String("MA".to_string()), + ), + DeleteExpr::new("temp".to_string(), Op::Eq, Scalar::F64((87.5).into())), + ]; + + assert_eq!(result, expected) + } + + #[test] + fn test_parse_predicate_invalid() { + let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + + let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1 + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + + let pred = r#"cost > 100"#; // > + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + + let pred = r#"cost <= 100"#; // < + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + + let pred = r#"cost gt 100"#; // > + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + + let pred = r#"city = cost = 100"#; // > + let result = ParseDeletePredicate::parse_predicate(pred); + assert!(result.is_err()); + } + + #[test] + fn test_full_delete_pred() { + let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 + let stop = r#"200"#; + let pred = r#"cost != 100"#; + + let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap(); + assert_eq!(result.start_time, 0); + assert_eq!(result.stop_time, 200); + + let expected = vec![DeleteExpr::new( + "cost".to_string(), + Op::Ne, + Scalar::I64(100), + )]; + assert_eq!(result.predicate, expected); + } + + #[test] + fn test_full_delete_pred_invalid_time_range() { + let start = r#"100"#; + let stop = r#"50"#; + let pred = r#"cost != 100"#; + + let result = ParseDeletePredicate::try_new(start, stop, pred); + assert!(result.is_err()); + } + + #[test] + fn test_full_delete_pred_invalid_pred() { + let start = r#"100"#; + let stop = r#"200"#; + let pred = r#"cost > 100"#; + + let result = ParseDeletePredicate::try_new(start, stop, pred); + assert!(result.is_err()); + } + + #[test] + fn test_parse_delete_full() { + let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; + + let expected = ParsedDelete { + table_name: "mytable".to_string(), + predicate: "host=\"Orient.local\"".to_string(), + start_time: "1970-01-01T00:00:00Z".to_string(), + stop_time: "2070-01-02T00:00:00Z".to_string(), + }; + + let result = parse_delete(delete_str).unwrap(); + assert_eq!(expected, result); + } + + #[test] + fn test_parse_delete_no_table() { + let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#; + + let expected = ParsedDelete { + table_name: "".to_string(), + predicate: "host=\"Orient.local\"".to_string(), + start_time: "1970-01-01T00:00:00Z".to_string(), + stop_time: "2070-01-02T00:00:00Z".to_string(), + }; + + let result = parse_delete(delete_str).unwrap(); + assert_eq!(expected, result); + } + + #[test] + fn test_parse_delete_empty_predicate() { + let delete_str = + r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#; + + let expected = ParsedDelete { + table_name: "".to_string(), + predicate: "".to_string(), + start_time: "1970-01-01T00:00:00Z".to_string(), + stop_time: "2070-01-02T00:00:00Z".to_string(), + }; + + let result = parse_delete(delete_str).unwrap(); + assert_eq!(expected, result); + } + + #[test] + fn test_parse_delete_no_predicate() { + let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; + + let expected = ParsedDelete { + table_name: "".to_string(), + predicate: "".to_string(), + start_time: "1970-01-01T00:00:00Z".to_string(), + stop_time: "2070-01-02T00:00:00Z".to_string(), + }; + + let result = parse_delete(delete_str).unwrap(); + assert_eq!(expected, result); + } + + // NGA todo: check content of error messages + #[test] + fn test_parse_delete_negative() { + // invalid key + let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; + let result = parse_delete(delete_str); + let err = result.unwrap_err(); + assert!(err + .to_string() + .contains("Invalid key which is either 'start', 'stop', or 'predicate'")); + + // invalid timestamp value + let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#; + let result = parse_delete(delete_str); + let err = result.unwrap_err(); + assert!(err + .to_string() + .contains("Invalid timestamp or predicate value")); + + // invalid JSON + let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#; + let result = parse_delete(delete_str); + let err = result.unwrap_err(); + assert!(err.to_string().contains("Unable to parse delete string")); + } +} diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index 12321e4a54..638eeba423 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -1,4 +1,5 @@ pub mod delete_expr; +pub mod delete_predicate; pub mod predicate; pub mod regex; pub mod serialize; diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index f3d823b715..81072c5cff 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -11,75 +11,12 @@ use std::{ use data_types::timestamp::TimestampRange; use datafusion::{ error::DataFusionError, - logical_plan::{col, lit, lit_timestamp_nano, Column, Expr, Operator}, + logical_plan::{col, lit_timestamp_nano, Expr, Operator}, optimizer::utils, }; use datafusion_util::{make_range_expr, AndExprBuilder}; use internal_types::schema::TIME_COLUMN_NAME; use observability_deps::tracing::debug; -use sqlparser::{ - ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, - dialect::GenericDialect, - parser::Parser, -}; - -use snafu::{ResultExt, Snafu}; - -use chrono::DateTime; - -const FLUX_TABLE: &str = "_measurement"; - -// Parse Delete Predicates -/// Parse Error -#[derive(Debug, Snafu)] -pub enum Error { - /// Invalid time format - #[snafu(display("Invalid timestamp: {}", value))] - InvalidTimestamp { value: String }, - - /// Invalid time range - #[snafu(display("Invalid time range: ({}, {})", start, stop))] - InvalidTimeRange { start: String, stop: String }, - - /// Predicate syntax error - #[snafu(display("Invalid predicate syntax: ({})", value))] - InvalidSyntax { value: String }, - - /// Predicate semantics error - #[snafu(display("Invalid predicate semantics: ({})", value))] - InvalidSemantics { value: String }, - - /// Predicate include non supported expression - #[snafu(display("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", value))] - NotSupportPredicate { value: String }, - - #[snafu(display(r#"Unable to parse delete string '{}'"#, value))] - DeleteInvalid { - source: serde_json::Error, - value: String, - }, - - #[snafu(display( - r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#, - value - ))] - DeleteKeywordInvalid { value: String }, - - #[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))] - DeleteValueInvalid { value: String }, - - #[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))] - DeleteObjectInvalid { value: String }, - - #[snafu(display(r#"Invalid table name in delete '{}'"#, value))] - DeleteTableInvalid { value: String }, - - #[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))] - DeleteStartStopInvalid { value: String }, -} - -/// Result type for Parser Cient -pub type Result = std::result::Result; /// This `Predicate` represents the empty predicate (aka that /// evaluates to true for all rows). @@ -532,332 +469,6 @@ impl PredicateBuilder { } } -/// Parser for Delete predicate and time range -#[derive(Debug, Clone)] -pub struct ParseDeletePredicate { - pub start_time: i64, - pub stop_time: i64, - // conjunctive predicate of binary expressions of = or != - pub predicate: Vec, -} - -impl ParseDeletePredicate { - /// Create a ParseDeletePredicate - pub fn new(start_time: i64, stop_time: i64, predicate: Vec) -> Self { - Self { - start_time, - stop_time, - predicate, - } - } - - /// Parse and convert the delete grpc API into ParseDeletePredicate to send to server - pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result { - // parse and check time range - let (start_time, stop_time) = Self::parse_time_range(start, stop)?; - - // Parse the predicate - let delete_exprs = Self::parse_predicate(predicate)?; - - Ok(Self::new(start_time, stop_time, delete_exprs)) - } - - /// Parse the predicate and convert it into datafusion expression - /// A delete predicate is a conjunctive expression of many - /// binary expressions of 'colum = constant' or 'column != constant' - /// - pub fn parse_predicate(predicate: &str) -> Result> { - if predicate.is_empty() { - return Ok(vec![]); - } - - // "DELETE FROM table_name WHERE predicate" - // Table name can be anything to have sqlparser work on the right sql syntax - let mut sql = "DELETE FROM table_name WHERE ".to_string(); - sql.push_str(predicate); - - // parse the delete sql - let dialect = GenericDialect {}; - let ast = Parser::parse_sql(&dialect, sql.as_str()); - match ast { - Err(parse_err) => { - let error_str = format!("{}, {}", predicate, parse_err); - Err(Error::InvalidSyntax { value: error_str }) - } - Ok(mut stmt) => { - if stmt.len() != 1 { - return Err(Error::InvalidSemantics { - value: predicate.to_string(), - }); - } - - let stmt = stmt.pop(); - match stmt { - Some(Statement::Delete { - table_name: _, - selection: Some(expr), - .. - }) => { - // split this expr into smaller binary if any - let mut exprs = vec![]; - let split = Self::split_members(&expr, &mut exprs); - if !split { - return Err(Error::NotSupportPredicate { - value: predicate.to_string(), - }); - } - Ok(exprs) - } - _ => Err(Error::InvalidSemantics { - value: predicate.to_string(), - }), - } - } - } - } - - pub fn build_delete_predicate( - start_time: String, - stop_time: String, - predicate: String, - ) -> Result { - // parse time range and the predicate - let parse_delete_pred = ParseDeletePredicate::try_new( - start_time.as_str(), - stop_time.as_str(), - predicate.as_str(), - )?; - - let mut del_predicate = PredicateBuilder::new() - .timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time) - .build(); - - // Add the predicate binary expressions - for expr in parse_delete_pred.predicate { - del_predicate.exprs.push(expr); - } - - Ok(del_predicate) - } - - /// Recursively split all "AND" expressions into smaller ones - /// Example: "A AND B AND C" => [A, B, C] - /// Return false if not all of them are AND of binary expression of - /// "column_name = literal" or "column_name != literal" - /// - /// The split expressions will be converted into data fusion expressions - pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec) -> bool { - // The below code built to be compatible with - // https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go - match predicate { - SqlParserExpr::BinaryOp { - left, - op: BinaryOperator::And, - right, - } => { - if !Self::split_members(left, predicates) { - return false; - } - if !Self::split_members(right, predicates) { - return false; - } - } - SqlParserExpr::BinaryOp { left, op, right } => { - // Verify Operator - let op = match op { - BinaryOperator::Eq => Operator::Eq, - BinaryOperator::NotEq => Operator::NotEq, - _ => return false, - }; - - // verify if left is identifier (column name) - let column = match &**left { - SqlParserExpr::Identifier(Ident { - value, - quote_style: _, // all quotes are ignored as done in idpe - }) => Expr::Column(Column { - relation: None, - name: value.to_string(), - }), - _ => return false, // not a column name - }; - - // verify if right is a literal or an identifier (e.g column name) - let value = match &**right { - SqlParserExpr::Identifier(Ident { - value, - quote_style: _, - }) => lit(value.to_string()), - SqlParserExpr::Value(Value::DoubleQuotedString(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::SingleQuotedString(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::NationalStringLiteral(value)) => { - lit(value.to_string()) - } - SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()), - SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::() { - Ok(v) => lit(v), - Err(_) => lit(v.parse::().unwrap()), - }, - SqlParserExpr::Value(Value::Boolean(v)) => lit(*v), - _ => return false, // not a literal - }; - - let expr = Expr::BinaryExpr { - left: Box::new(column), - op, - right: Box::new(value), - }; - predicates.push(expr); - } - _ => return false, - } - - true - } - - /// Parse a time and return its time in nanosecond - pub fn parse_time(input: &str) -> Result { - // This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z - // See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame - let datetime_result = DateTime::parse_from_rfc3339(input); - match datetime_result { - Ok(datetime) => Ok(datetime.timestamp_nanos()), - Err(timestamp_err) => { - // See if it is in nanosecond form - let time_result = input.parse::(); - match time_result { - Ok(nano) => Ok(nano), - Err(nano_err) => { - // wrong format, return both error - let error_str = format!("{}, {}", timestamp_err, nano_err); - Err(Error::InvalidTimestamp { value: error_str }) - } - } - } - } - } - - /// Parse a time range [start, stop] - pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> { - let start_time = Self::parse_time(start)?; - let stop_time = Self::parse_time(stop)?; - if start_time > stop_time { - return Err(Error::InvalidTimeRange { - start: start.to_string(), - stop: stop.to_string(), - }); - } - - Ok((start_time, stop_time)) - } -} - -// Note that this struct and its functions are used to parse FLUX DELETE, -// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before -// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is -// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the -// sql-format predicates and timestamps -#[derive(Debug, Default, PartialEq, Clone)] -/// data of a parsed delete -pub struct ParsedDelete { - /// Empty string, "", if no table specified - pub table_name: String, - pub start_time: String, - pub stop_time: String, - pub predicate: String, -} - -/// Return parsed data of an influx delete: -/// A few input examples and their parsed results: -/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"} -/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\""" -/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"} -/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50" -pub fn parse_delete(input: &str) -> Result { - let parsed_obj: serde_json::Value = - serde_json::from_str(input).context(DeleteInvalid { value: input })?; - let mut parsed_delete = ParsedDelete::default(); - - if let serde_json::Value::Object(items) = parsed_obj { - for item in items { - // The value must be type String - if let Some(val) = item.1.as_str() { - match item.0.to_lowercase().as_str() { - "start" => parsed_delete.start_time = val.to_string(), - "stop" => parsed_delete.stop_time = val.to_string(), - "predicate" => parsed_delete.predicate = val.to_string(), - _ => { - return Err(Error::DeleteKeywordInvalid { - value: input.to_string(), - }) - } - } - } else { - return Err(Error::DeleteValueInvalid { - value: input.to_string(), - }); - } - } - } else { - return Err(Error::DeleteObjectInvalid { - value: input.to_string(), - }); - } - - // Start or stop is empty - if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() { - return Err(Error::DeleteStartStopInvalid { - value: input.to_string(), - }); - } - - // Extract table from the predicate if any - if parsed_delete.predicate.contains(FLUX_TABLE) { - // since predicate is a conjunctive expression, split them by "and" - let predicate = parsed_delete - .predicate - .replace(" AND ", " and ") - .replace(" ANd ", " and ") - .replace(" And ", " and ") - .replace(" AnD ", " and "); - - let split: Vec<&str> = predicate.split("and").collect(); - - let mut predicate_no_table = "".to_string(); - for s in split { - if s.contains(FLUX_TABLE) { - // This should be in form "_measurement = " - // only by replacing the rest with "" - let table_name = s - .replace(FLUX_TABLE, "") - .replace("=", "") - .trim() - .to_string(); - // Do not support white spaces in table name - if table_name.contains(' ') { - return Err(Error::DeleteTableInvalid { - value: input.to_string(), - }); - } - parsed_delete.table_name = table_name; - } else { - // This is a normal column comparison, put it back to send to sqlparser later - if !predicate_no_table.is_empty() { - predicate_no_table.push_str(" and ") - } - predicate_no_table.push_str(s.trim()); - } - } - parsed_delete.predicate = predicate_no_table; - } - - Ok(parsed_delete) -} - #[cfg(test)] mod tests { use super::*; @@ -999,282 +610,4 @@ mod tests { assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]"); } - - // The delete predicate - #[test] - fn test_time_range_valid() { - let start = r#"100"#; - let stop = r#"100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (100, 100); - assert_eq!(result, expected); - - let start = r#"100"#; - let stop = r#"200"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (100, 200); - assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - let stop = r#"1970-01-01T00:00:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 0); - assert_eq!(result, expected); - - // let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - // let stop = r#"now()"#; // -- Not working. Need to find a way to test this - // let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - // let expected = (0, 0); - // assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; - let stop = r#"100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 100); - assert_eq!(result, expected); - - let start = r#"1970-01-01T00:00:00Z"#; - let stop = r#"1970-01-01T00:01:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap(); - let expected = (0, 60000000000); - assert_eq!(result, expected); - } - - #[test] - fn test_time_range_invalid() { - let start = r#"100"#; - let stop = r#"-100"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"100"#; - let stop = r#"50"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"100"#; - let stop = r#"1970-01-01T00:00:00Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - - let start = r#"1971-09-01T00:00:10Z"#; - let stop = r#"1971-09-01T00:00:05Z"#; - let result = ParseDeletePredicate::parse_time_range(start, stop); - assert!(result.is_err()); - } - - #[test] - fn test_parse_timestamp() { - let input = r#"123"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 123); - - // must parse time - let input = r#"1970-01-01T00:00:00Z"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 0); - - let input = r#"1971-02-01T15:30:21Z"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, 34270221000000000); - } - - #[test] - fn test_parse_timestamp_negative() { - let input = r#"-123"#; - let time = ParseDeletePredicate::parse_time(input).unwrap(); - assert_eq!(time, -123); - } - - #[test] - fn test_parse_timestamp_invalid() { - let input = r#"123gdb"#; - ParseDeletePredicate::parse_time(input).unwrap_err(); - - let input = r#"1970-01-01T00:00:00"#; - ParseDeletePredicate::parse_time(input).unwrap_err(); - - // It turn out this is not invalid but return1 1971 - let input = r#"1971-02-01:30:21Z"#; - ParseDeletePredicate::parse_time(input).unwrap_err(); - } - - #[test] - fn test_parse_timestamp_out_of_range() { - let input = r#"99999999999999999999999999999999"#; - let time = ParseDeletePredicate::parse_time(input); - assert!(time.is_err()); - } - - #[test] - fn test_parse_predicate() { - let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#; - let result = ParseDeletePredicate::parse_predicate(pred).unwrap(); - - println!("{:#?}", result); - - let mut expected = vec![]; - let e = col("city").eq(lit("Boston")); - expected.push(e); - let val: i64 = 100; - let e = col("cost").not_eq(lit(val)); - expected.push(e); - let e = col("state").not_eq(lit("MA")); - expected.push(e); - let e = col("temp").eq(lit(87.5)); - expected.push(e); - - assert_eq!(result, expected) - } - - #[test] - fn test_parse_predicate_invalid() { - let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - - let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1 - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - - let pred = r#"cost > 100"#; // > - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - - let pred = r#"cost <= 100"#; // < - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - - let pred = r#"cost gt 100"#; // > - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - - let pred = r#"city = cost = 100"#; // > - let result = ParseDeletePredicate::parse_predicate(pred); - assert!(result.is_err()); - } - - #[test] - fn test_full_delete_pred() { - let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0 - let stop = r#"200"#; - let pred = r#"cost != 100"#; - - let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap(); - assert_eq!(result.start_time, 0); - assert_eq!(result.stop_time, 200); - - let mut expected = vec![]; - let num: i64 = 100; - let e = col("cost").not_eq(lit(num)); - expected.push(e); - assert_eq!(result.predicate, expected); - } - - #[test] - fn test_full_delete_pred_invalid_time_range() { - let start = r#"100"#; - let stop = r#"50"#; - let pred = r#"cost != 100"#; - - let result = ParseDeletePredicate::try_new(start, stop, pred); - assert!(result.is_err()); - } - - #[test] - fn test_full_delete_pred_invalid_pred() { - let start = r#"100"#; - let stop = r#"200"#; - let pred = r#"cost > 100"#; - - let result = ParseDeletePredicate::try_new(start, stop, pred); - assert!(result.is_err()); - } - - #[test] - fn test_parse_delete_full() { - let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; - - let expected = ParsedDelete { - table_name: "mytable".to_string(), - predicate: "host=\"Orient.local\"".to_string(), - start_time: "1970-01-01T00:00:00Z".to_string(), - stop_time: "2070-01-02T00:00:00Z".to_string(), - }; - - let result = parse_delete(delete_str).unwrap(); - assert_eq!(expected, result); - } - - #[test] - fn test_parse_delete_no_table() { - let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#; - - let expected = ParsedDelete { - table_name: "".to_string(), - predicate: "host=\"Orient.local\"".to_string(), - start_time: "1970-01-01T00:00:00Z".to_string(), - stop_time: "2070-01-02T00:00:00Z".to_string(), - }; - - let result = parse_delete(delete_str).unwrap(); - assert_eq!(expected, result); - } - - #[test] - fn test_parse_delete_empty_predicate() { - let delete_str = - r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#; - - let expected = ParsedDelete { - table_name: "".to_string(), - predicate: "".to_string(), - start_time: "1970-01-01T00:00:00Z".to_string(), - stop_time: "2070-01-02T00:00:00Z".to_string(), - }; - - let result = parse_delete(delete_str).unwrap(); - assert_eq!(expected, result); - } - - #[test] - fn test_parse_delete_no_predicate() { - let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; - - let expected = ParsedDelete { - table_name: "".to_string(), - predicate: "".to_string(), - start_time: "1970-01-01T00:00:00Z".to_string(), - stop_time: "2070-01-02T00:00:00Z".to_string(), - }; - - let result = parse_delete(delete_str).unwrap(); - assert_eq!(expected, result); - } - - // NGA todo: check content of error messages - #[test] - fn test_parse_delete_negative() { - // invalid key - let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#; - let result = parse_delete(delete_str); - let err = result.unwrap_err(); - assert!(err - .to_string() - .contains("Invalid key which is either 'start', 'stop', or 'predicate'")); - - // invalid timestamp value - let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#; - let result = parse_delete(delete_str); - let err = result.unwrap_err(); - assert!(err - .to_string() - .contains("Invalid timestamp or predicate value")); - - // invalid JSON - let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#; - let result = parse_delete(delete_str); - let err = result.unwrap_err(); - assert!(err.to_string().contains("Unable to parse delete string")); - } } diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index 996b51bbe9..996c625e13 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -12,36 +12,24 @@ use data_types::timestamp::TimestampRange; use generated_types::influxdata::iox::catalog::v1 as proto; use snafu::{ResultExt, Snafu}; -use crate::{delete_expr::DeleteExpr, predicate::Predicate}; +use crate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate}; -#[derive(Debug, Snafu)] -pub enum SerializeError { - #[snafu(display("cannot convert datafusion expr: {}", source))] - CannotConvertDataFusionExpr { - source: crate::delete_expr::DataFusionToExprError, - }, -} - -/// Serialize IOx [`Predicate`] to a protobuf object. -pub fn serialize(predicate: &Predicate) -> Result { - let proto_predicate = proto::Predicate { +/// Serialize IOx [`DeletePredicate`] to a protobuf object. +pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate { + proto::Predicate { table_names: serialize_optional_string_set(&predicate.table_names), field_columns: serialize_optional_string_set(&predicate.field_columns), partition_key: serialize_optional_string(&predicate.partition_key), - range: serialize_timestamp_range(&predicate.range), + range: Some(proto::TimestampRange { + start: predicate.range.start, + end: predicate.range.end, + }), exprs: predicate .exprs .iter() - .map(|expr| { - let expr: DeleteExpr = expr - .clone() - .try_into() - .context(CannotConvertDataFusionExpr)?; - Ok(expr.into()) - }) - .collect::, SerializeError>>()?, - }; - Ok(proto_predicate) + .map(|expr| expr.clone().into()) + .collect(), + } } fn serialize_optional_string_set( @@ -57,36 +45,41 @@ fn serialize_optional_string(s: &Option) -> Option) -> Option { - r.as_ref().map(|r| proto::TimestampRange { - start: r.start, - end: r.end, - }) -} - #[derive(Debug, Snafu)] pub enum DeserializeError { + #[snafu(display("timestamp range is missing"))] + RangeMissing, + #[snafu(display("cannot deserialize expr: {}", source))] CannotDeserializeExpr { source: crate::delete_expr::ProtoToExprError, }, } -/// Deserialize IOx [`Predicate`] from a protobuf object. -pub fn deserialize(proto_predicate: &proto::Predicate) -> Result { - let predicate = Predicate { +/// Deserialize IOx [`DeletePredicate`] from a protobuf object. +pub fn deserialize( + proto_predicate: &proto::Predicate, +) -> Result { + let predicate = DeletePredicate { table_names: deserialize_optional_string_set(&proto_predicate.table_names), field_columns: deserialize_optional_string_set(&proto_predicate.field_columns), partition_key: deserialize_optional_string(&proto_predicate.partition_key), - range: deserialize_timestamp_range(&proto_predicate.range), + range: proto_predicate + .range + .as_ref() + .map(|r| TimestampRange { + start: r.start, + end: r.end, + }) + .ok_or(DeserializeError::RangeMissing)?, exprs: proto_predicate .exprs .iter() .map(|expr| { let expr: DeleteExpr = expr.clone().try_into().context(CannotDeserializeExpr)?; - Ok(expr.into()) + Ok(expr) }) - .collect::, DeserializeError>>()?, + .collect::, DeserializeError>>()?, }; Ok(predicate) } @@ -101,16 +94,9 @@ fn deserialize_optional_string(s: &Option) -> Option) -> Option { - r.as_ref().map(|r| TimestampRange { - start: r.start, - end: r.end, - }) -} - #[cfg(test)] mod tests { - use crate::predicate::{ParseDeletePredicate, PredicateBuilder}; + use crate::delete_predicate::ParseDeletePredicate; use super::*; @@ -118,12 +104,12 @@ mod tests { fn test_roundtrip() { let table_name = "my_table"; let predicate = delete_predicate(table_name); - let proto = serialize(&predicate).unwrap(); + let proto = serialize(&predicate); let recovered = deserialize(&proto).unwrap(); assert_eq!(predicate, recovered); } - fn delete_predicate(table_name: &str) -> Predicate { + fn delete_predicate(table_name: &str) -> DeletePredicate { let start_time = "11"; let stop_time = "22"; let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#; @@ -131,14 +117,8 @@ mod tests { let parse_delete_pred = ParseDeletePredicate::try_new(start_time, stop_time, predicate).unwrap(); - let mut del_predicate_builder = PredicateBuilder::new() - .table(table_name) - .timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time); - - for expr in parse_delete_pred.predicate { - del_predicate_builder = del_predicate_builder.add_expr(expr); - } - - del_predicate_builder.build() + let mut pred: DeletePredicate = parse_delete_pred.into(); + pred.table_names = Some(IntoIterator::into_iter([table_name.to_string()]).collect()); + pred } } diff --git a/query/src/lib.rs b/query/src/lib.rs index ca7e78343a..4b48226dc2 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -19,7 +19,10 @@ use internal_types::{ selection::Selection, }; use observability_deps::tracing::{debug, trace}; -use predicate::predicate::{Predicate, PredicateMatch}; +use predicate::{ + delete_predicate::DeletePredicate, + predicate::{Predicate, PredicateMatch}, +}; use hashbrown::HashMap; use std::{fmt::Debug, sync::Arc}; @@ -46,7 +49,7 @@ pub trait QueryChunkMeta: Sized { fn schema(&self) -> Arc; // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &[Arc]; + fn delete_predicates(&self) -> &[Arc]; } /// A `Database` is the main trait implemented by the IOx subsystems @@ -166,7 +169,7 @@ where self.as_ref().schema() } - fn delete_predicates(&self) -> &[Arc] { + fn delete_predicates(&self) -> &[Arc] { let pred = self.as_ref().delete_predicates(); debug!(?pred, "Delete predicate in QueryChunkMeta"); pred diff --git a/query/src/provider.rs b/query/src/provider.rs index a937a7f278..afbd411b1c 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -774,8 +774,12 @@ impl Deduplicater { // Add Filter operator, FilterExec, if the chunk has delete predicates let del_preds = chunk.delete_predicates(); + let del_preds: Vec> = del_preds + .iter() + .map(|pred| Arc::new(pred.as_ref().clone().into())) + .collect(); debug!(?del_preds, "Chunk delete predicates"); - let negated_del_expr_val = Predicate::negated_expr(del_preds); + let negated_del_expr_val = Predicate::negated_expr(&del_preds[..]); if let Some(negated_del_expr) = negated_del_expr_val { debug!(?negated_del_expr, "Logical negated expressions"); diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 9f4bb0ce5a..658b35522f 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -117,9 +117,13 @@ impl ExecutionPlan for IOxReadFilterNode { let selection = Selection::Some(&selection_cols); let del_preds = chunk.delete_predicates(); + let del_preds: Vec> = del_preds + .iter() + .map(|pred| Arc::new(pred.as_ref().clone().into())) + .collect(); let stream = chunk - .read_filter(&self.predicate, selection, del_preds) + .read_filter(&self.predicate, selection, &del_preds) .map_err(|e| { DataFusionError::Execution(format!( "Error creating scan for table {} chunk {}: {}", diff --git a/query/src/test.rs b/query/src/test.rs index 176c31371a..7e27bdce1b 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -27,6 +27,7 @@ use internal_types::{ }; use observability_deps::tracing::debug; use parking_lot::Mutex; +use predicate::delete_predicate::DeletePredicate; use snafu::Snafu; use std::num::NonZeroU64; use std::{collections::BTreeMap, fmt, sync::Arc}; @@ -175,7 +176,7 @@ pub struct TestChunk { predicate_match: Option, /// Copy of delete predicates passed - delete_predicates: Vec>, + delete_predicates: Vec>, /// Order of this chunk relative to other overlapping chunks. order: ChunkOrder, @@ -914,7 +915,7 @@ impl QueryChunkMeta for TestChunk { } // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &[Arc] { + fn delete_predicates(&self) -> &[Arc] { let pred = &self.delete_predicates; debug!(?pred, "Delete predicate in Test Chunk"); diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index a6611d61c6..c61de74def 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -1,8 +1,9 @@ //! This module contains testing scenarios for Delete use data_types::chunk_metadata::ChunkId; -use datafusion::logical_plan::{col, lit}; -use predicate::predicate::{Predicate, PredicateBuilder}; +use data_types::timestamp::TimestampRange; +use predicate::delete_expr::DeleteExpr; +use predicate::delete_predicate::DeletePredicate; use async_trait::async_trait; use query::QueryChunk; @@ -32,10 +33,13 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll { let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"]; // delete predicate - let pred = PredicateBuilder::new() - .table(table_name) - .timestamp_range(0, 25) - .build(); + let pred = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 25 }, + exprs: vec![], + }; // this returns 15 scenarios all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key) @@ -56,12 +60,17 @@ impl DbSetup for OneDeleteSimpleExprOneChunk { let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"]; // delete predicate - let expr = col("bar").eq(lit(1f64)); - let pred = PredicateBuilder::new() - .table(table_name) - .timestamp_range(0, 15) - .add_expr(expr) - .build(); + let pred = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 15 }, + exprs: vec![DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::F64((1.0).into()), + )], + }; // this returns 15 scenarios all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key) @@ -85,14 +94,24 @@ impl DbSetup for OneDeleteMultiExprsOneChunk { "cpu,foo=me bar=1 40", ]; // delete predicate - let expr1 = col("bar").eq(lit(1f64)); - let expr2 = col("foo").eq(lit("me")); - let pred = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 32) - .add_expr(expr1) - .add_expr(expr2) - .build(); + let pred = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 32 }, + exprs: vec![ + DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::F64((1.0).into()), + ), + DeleteExpr::new( + "foo".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::String("me".to_string()), + ), + ], + }; // this returns 15 scenarios all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key) @@ -122,22 +141,37 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk { ]; // delete predicate // pred1: delete from cpu where 0 <= time < 32 and bar = 1 and foo = 'me' - let expr1 = col("bar").eq(lit(1f64)); - let expr2 = col("foo").eq(lit("me")); - let pred1 = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 32) - .add_expr(expr1) - .add_expr(expr2) - .build(); + let pred1 = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 32 }, + exprs: vec![ + DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::F64((1.0).into()), + ), + DeleteExpr::new( + "foo".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::String("me".to_string()), + ), + ], + }; // pred2: delete from cpu where 10 <= time < 45 and bar != 1 - let expr3 = col("bar").not_eq(lit(1f64)); - let pred2 = PredicateBuilder::new() - .table("cpu") - .timestamp_range(10, 45) - .add_expr(expr3) - .build(); + let pred2 = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 10, end: 45 }, + exprs: vec![DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Ne, + predicate::delete_expr::Scalar::F64((1.0).into()), + )], + }; // build scenarios all_delete_scenarios_for_one_chunk( @@ -171,14 +205,24 @@ impl DbSetup for ThreeDeleteThreeChunks { ]; // delete predicate on chunk 1 //let i: f64 = 1.0; - let expr1 = col("bar").eq(lit(1f64)); - let expr2 = col("foo").eq(lit("me")); - let pred1 = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 32) - .add_expr(expr1) - .add_expr(expr2) - .build(); + let pred1 = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 32 }, + exprs: vec![ + DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::F64((1.0).into()), + ), + DeleteExpr::new( + "foo".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::String("me".to_string()), + ), + ], + }; //chunk 2 data let lp_lines_2 = vec![ @@ -188,12 +232,17 @@ impl DbSetup for ThreeDeleteThreeChunks { "cpu,foo=me bar=5 60", ]; // delete predicate on chunk 1 & chunk 2 - let expr = col("foo").eq(lit("you")); - let pred2 = PredicateBuilder::new() - .table("cpu") - .timestamp_range(20, 45) - .add_expr(expr) - .build(); + let pred2 = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 20, end: 45 }, + exprs: vec![DeleteExpr::new( + "foo".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::String("you".to_string()), + )], + }; // chunk 3 data let lp_lines_3 = vec![ @@ -203,13 +252,17 @@ impl DbSetup for ThreeDeleteThreeChunks { "cpu,foo=me bar=8 90", // deleted by pred3 ]; // delete predicate on chunk 3 - let i: f64 = 7.0; - let expr = col("bar").not_eq(lit(i)); - let pred3 = PredicateBuilder::new() - .table("cpu") - .timestamp_range(75, 95) - .add_expr(expr) - .build(); + let pred3 = DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 75, end: 95 }, + exprs: vec![DeleteExpr::new( + "bar".to_string(), + predicate::delete_expr::Op::Ne, + predicate::delete_expr::Scalar::F64((7.0).into()), + )], + }; // ---------------------- // 3 chunks: MUB, RUB, OS @@ -412,7 +465,7 @@ impl ChunkStage { #[derive(Debug, Clone)] pub struct Pred<'a> { /// Delete predicate - predicate: &'a Predicate, + predicate: &'a DeletePredicate, /// At which chunk stage this predicate is applied delete_time: DeleteTime, } @@ -464,9 +517,9 @@ impl DeleteTime { /// Exhaust tests of chunk stages and their life cycle moves for given set of delete predicates async fn all_delete_scenarios_for_one_chunk( // These delete predicates are applied at all stages of the chunk life cycle - chunk_stage_preds: Vec<&Predicate>, + chunk_stage_preds: Vec<&DeletePredicate>, // These delete predicates are applied all chunks at their final stages - at_end_preds: Vec<&Predicate>, + at_end_preds: Vec<&DeletePredicate>, // Single chunk data lp_lines: Vec<&str>, // Table of the chunk @@ -677,7 +730,7 @@ async fn make_chunk_with_deletes_at_different_stages( // function will created a much more complicated cases to handle async fn make_different_stage_chunks_with_deletes_scenario( data: Vec>, - preds: Vec<&Predicate>, + preds: Vec<&DeletePredicate>, table_name: &str, partition_key: &str, ) -> DbScenario { diff --git a/server/src/db.rs b/server/src/db.rs index b617349ff4..a2401b0506 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -39,7 +39,7 @@ use parquet_file::catalog::{ prune::prune_history as prune_catalog_transaction_history, }; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; -use predicate::predicate::Predicate; +use predicate::{delete_predicate::DeletePredicate, predicate::Predicate}; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, QueryDatabase, @@ -546,7 +546,7 @@ impl Db { pub async fn delete( self: &Arc, table_name: &str, - delete_predicate: Arc, + delete_predicate: Arc, ) -> Result<()> { // collect delete predicates on preserved partitions for a catalog transaction let mut affected_persisted_chunks = vec![]; @@ -1263,8 +1263,10 @@ impl CatalogProvider for Db { pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData { let mut files = HashMap::new(); - let mut delete_predicates: HashMap, Vec)> = - Default::default(); + let mut delete_predicates: HashMap< + usize, + (Arc, Vec), + > = Default::default(); for chunk in catalog.chunks() { let guard = chunk.read(); @@ -1288,8 +1290,8 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData || guard.is_in_lifecycle(ChunkLifecycleAction::Persisting) { for predicate in guard.delete_predicates() { - let predicate_ref: &Predicate = predicate.as_ref(); - let addr = (predicate_ref as *const Predicate) as usize; + let predicate_ref: &DeletePredicate = predicate.as_ref(); + let addr = (predicate_ref as *const DeletePredicate) as usize; delete_predicates .entry(addr) .and_modify(|(_predicate, v)| v.push(guard.addr().clone().into())) @@ -1415,10 +1417,8 @@ mod tests { use arrow::record_batch::RecordBatch; use bytes::Bytes; - use chrono::{DateTime, TimeZone}; - use datafusion::logical_plan::{col, lit}; use futures::{stream, StreamExt, TryStreamExt}; - use predicate::predicate::PredicateBuilder; + use predicate::delete_expr::DeleteExpr; use tokio_util::sync::CancellationToken; use ::test_helpers::{assert_contains, maybe_start_logging}; @@ -1427,6 +1427,7 @@ mod tests { chunk_metadata::{ChunkAddr, ChunkStorage}, database_rules::{LifecycleRules, PartitionTemplate, TemplatePart}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, + timestamp::TimestampRange, write_summary::TimestampSummary, }; use entry::test_helpers::lp_to_entry; @@ -3685,14 +3686,20 @@ mod tests { .unwrap(); // ==================== do: delete ==================== - let expr = col("selector").eq(lit(1i64)); - let pred = Arc::new( - PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 1_000) - .add_expr(expr) - .build(), - ); + let pred = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { + start: 0, + end: 1_000, + }, + exprs: vec![DeleteExpr::new( + "selector".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::I64(1), + )], + }); db.delete("cpu", Arc::clone(&pred)).await.unwrap(); // ==================== do: preserve another partition ==================== diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 0cbf4ba3a1..aedf5a2e1f 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -16,7 +16,7 @@ use internal_types::{access::AccessRecorder, schema::Schema}; use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk}; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use read_buffer::RBChunk; use tracker::{TaskRegistration, TaskTracker}; @@ -80,7 +80,7 @@ pub struct ChunkMetadata { pub schema: Arc, /// Delete predicates of this chunk - pub delete_predicates: Vec>, + pub delete_predicates: Vec>, } /// Different memory representations of a frozen chunk. @@ -311,7 +311,7 @@ impl CatalogChunk { time_of_last_write: DateTime, schema: Arc, metrics: ChunkMetrics, - delete_predicates: Vec>, + delete_predicates: Vec>, order: ChunkOrder, ) -> Self { let stage = ChunkStage::Frozen { @@ -346,7 +346,7 @@ impl CatalogChunk { time_of_first_write: DateTime, time_of_last_write: DateTime, metrics: ChunkMetrics, - delete_predicates: Vec>, + delete_predicates: Vec>, order: ChunkOrder, ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); @@ -473,7 +473,7 @@ impl CatalogChunk { } } - pub fn add_delete_predicate(&mut self, delete_predicate: Arc) { + pub fn add_delete_predicate(&mut self, delete_predicate: Arc) { debug!( ?delete_predicate, "Input delete predicate to CatalogChunk add_delete_predicate" @@ -498,7 +498,7 @@ impl CatalogChunk { } } - pub fn delete_predicates(&self) -> &[Arc] { + pub fn delete_predicates(&self) -> &[Arc] { match &self.stage { ChunkStage::Open { mb_chunk: _ } => { // no delete predicate for open chunk @@ -686,13 +686,13 @@ impl CatalogChunk { /// /// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage /// (no-op) and will fail for other stages. - pub fn freeze_with_predicate(&mut self, delete_predicate: Arc) -> Result<()> { + pub fn freeze_with_predicate(&mut self, delete_predicate: Arc) -> Result<()> { self.freeze_with_delete_predicates(vec![delete_predicate]) } fn freeze_with_delete_predicates( &mut self, - delete_predicates: Vec>, + delete_predicates: Vec>, ) -> Result<()> { match &self.stage { ChunkStage::Open { mb_chunk, .. } => { @@ -906,7 +906,7 @@ impl CatalogChunk { #[cfg(test)] mod tests { use super::*; - use datafusion::logical_plan::{col, lit}; + use data_types::timestamp::TimestampRange; use entry::test_helpers::lp_to_entry; use mutable_buffer::chunk::ChunkMetrics as MBChunkMetrics; @@ -916,7 +916,7 @@ mod tests { make_chunk as make_parquet_chunk_with_store, make_iox_object_store, TestSize, }, }; - use predicate::predicate::PredicateBuilder; + use predicate::delete_expr::DeleteExpr; #[test] fn test_new_open() { @@ -1083,18 +1083,20 @@ mod tests { assert_eq!(del_preds.len(), 0); // Build delete predicate and expected output - let expr1 = col("city").eq(lit("Boston")); - let del_pred1 = PredicateBuilder::new() - .table("test") - .timestamp_range(1, 100) - .add_expr(expr1) - .build(); - let mut expected_exprs1 = vec![]; - let e = col("city").eq(lit("Boston")); - expected_exprs1.push(e); + let del_pred1 = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 100 }, + exprs: vec![DeleteExpr::new( + "city".to_string(), + predicate::delete_expr::Op::Eq, + predicate::delete_expr::Scalar::String("Boston".to_string()), + )], + }); // Add a delete predicate into a chunk the open chunk = delete simulation for open chunk - chunk.add_delete_predicate(Arc::new(del_pred1)); + chunk.add_delete_predicate(Arc::clone(&del_pred1)); // chunk must be in frozen stage now assert_eq!(chunk.stage().name(), "Frozen"); // chunk must have a delete predicate @@ -1102,26 +1104,22 @@ mod tests { assert_eq!(del_preds.len(), 1); // verify delete predicate value let pred = &del_preds[0]; - if let Some(range) = pred.range { - assert_eq!(range.start, 1); // start time - assert_eq!(range.end, 100); // stop time - } else { - panic!("No time range set for delete predicate"); - } - assert_eq!(pred.exprs, expected_exprs1); + assert_eq!(pred, &del_pred1); // let add more delete predicate = simulate second delete // Build delete predicate and expected output - let expr2 = col("cost").not_eq(lit(15)); - let del_pred2 = PredicateBuilder::new() - .table("test") - .timestamp_range(20, 50) - .add_expr(expr2) - .build(); - let mut expected_exprs2 = vec![]; - let e = col("cost").not_eq(lit(15)); - expected_exprs2.push(e); - chunk.add_delete_predicate(Arc::new(del_pred2)); + let del_pred2 = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 20, end: 50 }, + exprs: vec![DeleteExpr::new( + "cost".to_string(), + predicate::delete_expr::Op::Ne, + predicate::delete_expr::Scalar::I64(15), + )], + }); + chunk.add_delete_predicate(Arc::clone(&del_pred2)); // chunk still must be in frozen stage now assert_eq!(chunk.stage().name(), "Frozen"); // chunk must have 2 delete predicates @@ -1129,13 +1127,7 @@ mod tests { assert_eq!(del_preds.len(), 2); // verify the second delete predicate value let pred = &del_preds[1]; - if let Some(range) = pred.range { - assert_eq!(range.start, 20); // start time - assert_eq!(range.end, 50); // stop time - } else { - panic!("No time range set for delete predicate"); - } - assert_eq!(pred.exprs, expected_exprs2); + assert_eq!(pred, &del_pred2); } fn make_mb_chunk(table_name: &str) -> MBChunk { diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 5dd0427542..0d7ef3698e 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -13,7 +13,7 @@ use observability_deps::tracing::info; use persistence_windows::{ min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows, }; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use snafu::{OptionExt, Snafu}; use std::{collections::BTreeMap, fmt::Display, sync::Arc}; use tracker::RwLock; @@ -231,7 +231,7 @@ impl Partition { time_of_first_write: DateTime, time_of_last_write: DateTime, schema: Arc, - delete_predicates: Vec>, + delete_predicates: Vec>, chunk_order: ChunkOrder, ) -> (ChunkId, &Arc>) { let chunk_id = self.next_chunk_id(); @@ -273,7 +273,7 @@ impl Partition { chunk: Arc, time_of_first_write: DateTime, time_of_last_write: DateTime, - delete_predicates: Vec>, + delete_predicates: Vec>, chunk_order: ChunkOrder, ) -> &Arc> { assert_eq!(chunk.table_name(), self.table_name()); diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index f844758f9a..eb042e37f3 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -18,7 +18,10 @@ use mutable_buffer::chunk::snapshot::ChunkSnapshot; use observability_deps::tracing::debug; use parquet_file::chunk::ParquetChunk; use partition_metadata::TableSummary; -use predicate::predicate::{Predicate, PredicateMatch}; +use predicate::{ + delete_predicate::DeletePredicate, + predicate::{Predicate, PredicateMatch}, +}; use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta}; use read_buffer::RBChunk; use snafu::{OptionExt, ResultExt, Snafu}; @@ -546,7 +549,7 @@ impl QueryChunkMeta for DbChunk { } // return a reference to delete predicates of the chunk - fn delete_predicates(&self) -> &[Arc] { + fn delete_predicates(&self) -> &[Arc] { let pred = &self.meta.delete_predicates; debug!(?pred, "Delete predicate in DbChunk"); diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 62fbdbf9ca..de795ac474 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use data_types::{chunk_metadata::ChunkOrder, job::Job}; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -47,7 +47,7 @@ pub(crate) fn compact_chunks( let mut input_rows = 0; let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; - let mut delete_predicates: Vec> = vec![]; + let mut delete_predicates: Vec> = vec![]; let mut min_order = ChunkOrder::MAX; let query_chunks = chunks .into_iter() @@ -157,8 +157,8 @@ mod tests { use crate::db::test_helpers::write_lp; use crate::{db::test_helpers::write_lp_with_time, utils::make_db}; use data_types::chunk_metadata::ChunkStorage; + use data_types::timestamp::TimestampRange; use lifecycle::{LockableChunk, LockablePartition}; - use predicate::predicate::PredicateBuilder; use query::QueryDatabase; #[tokio::test] @@ -235,13 +235,19 @@ mod tests { assert_eq!(partition_keys.len(), 1); // Cannot simply use empty predicate (#2687) - let predicate = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 1_000) - .build(); + let predicate = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { + start: 0, + end: 1_000, + }, + exprs: vec![], + }); // Delete everything - db.delete("cpu", Arc::new(predicate)).await.unwrap(); + db.delete("cpu", predicate).await.unwrap(); let chunk = db .compact_partition("cpu", partition_keys[0].as_str()) .await diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index 9f6d4830ca..b44fc61086 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -11,7 +11,7 @@ use data_types::{chunk_metadata::ChunkOrder, job::Job}; use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use observability_deps::tracing::info; use persistence_windows::persistence_windows::FlushHandle; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -56,7 +56,7 @@ where let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; let mut query_chunks = vec![]; - let mut delete_predicates: Vec> = vec![]; + let mut delete_predicates: Vec> = vec![]; let mut min_order = ChunkOrder::MAX; for mut chunk in chunks { // Sanity-check @@ -213,10 +213,9 @@ mod tests { use super::*; use crate::{db::test_helpers::write_lp, utils::TestDb, Db}; use chrono::{TimeZone, Utc}; - use data_types::chunk_metadata::ChunkStorage; use data_types::database_rules::LifecycleRules; + use data_types::{chunk_metadata::ChunkStorage, timestamp::TimestampRange}; use lifecycle::{LockableChunk, LockablePartition}; - use predicate::predicate::PredicateBuilder; use query::QueryDatabase; use std::{ num::{NonZeroU32, NonZeroU64}, @@ -303,12 +302,15 @@ mod tests { let partition = db.partition("cpu", partition_key.as_str()).unwrap(); // Delete first row - let predicate = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 20) - .build(); + let predicate = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { start: 0, end: 20 }, + exprs: vec![], + }); - db.delete("cpu", Arc::new(predicate)).await.unwrap(); + db.delete("cpu", predicate).await.unwrap(); // Try to persist first write but it has been deleted let maybe_chunk = db @@ -366,12 +368,18 @@ mod tests { assert_eq!(chunks[1].row_count, 2); // Delete everything - let predicate = PredicateBuilder::new() - .table("cpu") - .timestamp_range(0, 1000) - .build(); + let predicate = Arc::new(DeletePredicate { + table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()), + field_columns: None, + partition_key: None, + range: TimestampRange { + start: 0, + end: 1_000, + }, + exprs: vec![], + }); - db.delete("cpu", Arc::new(predicate)).await.unwrap(); + db.delete("cpu", predicate).await.unwrap(); // Try to persist third set of writes let maybe_chunk = db diff --git a/server/src/db/lifecycle/write.rs b/server/src/db/lifecycle/write.rs index fe937e7e77..5029311e8c 100644 --- a/server/src/db/lifecycle/write.rs +++ b/server/src/db/lifecycle/write.rs @@ -26,6 +26,7 @@ use persistence_windows::{ checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder}, persistence_windows::FlushHandle, }; +use predicate::predicate::Predicate; use query::{QueryChunk, QueryChunkMeta}; use snafu::ResultExt; use std::{future::Future, sync::Arc}; @@ -92,12 +93,13 @@ where collect_checkpoints(flush_handle.checkpoint(), &db.catalog); // Get RecordBatchStream of data from the read buffer chunk + let del_preds: Vec> = db_chunk + .delete_predicates() + .iter() + .map(|pred| Arc::new(pred.as_ref().clone().into())) + .collect(); let stream = db_chunk - .read_filter( - &Default::default(), - Selection::All, - db_chunk.delete_predicates(), - ) + .read_filter(&Default::default(), Selection::All, &del_preds) .expect("read filter should be infallible"); // check that the upcoming state change will very likely succeed diff --git a/server/src/db/load.rs b/server/src/db/load.rs index b06f06e1ea..1931edc39a 100644 --- a/server/src/db/load.rs +++ b/server/src/db/load.rs @@ -15,7 +15,7 @@ use parquet_file::{ chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk}, }; use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner}; -use predicate::predicate::Predicate; +use predicate::delete_predicate::DeletePredicate; use snafu::{ResultExt, Snafu}; use std::sync::Arc; @@ -228,7 +228,7 @@ impl CatalogState for Loader { // Delete predicates are loaded explicitely via `CatalogState::delete_predicates` AFTER the chunk is added, so // we leave this list empty (for now). - let delete_predicates: Vec> = vec![]; + let delete_predicates: Vec> = vec![]; partition.insert_object_store_only_chunk( iox_md.chunk_id, @@ -276,7 +276,7 @@ impl CatalogState for Loader { fn delete_predicate( &mut self, - predicate: Arc, + predicate: Arc, chunks: Vec, ) { for addr in chunks { diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 928b7941d3..d56fdc463e 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -26,7 +26,7 @@ use data_types::{ }; use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; -use predicate::predicate::{parse_delete, ParseDeletePredicate}; +use predicate::delete_predicate::{parse_delete, ParseDeletePredicate}; use query::exec::ExecutionContextProvider; use server::{ApplicationState, ConnectionManager, Error, Server as AppServer}; @@ -163,13 +163,13 @@ pub enum ApplicationError { #[snafu(display("Error parsing delete {}: {}", input, source))] ParsingDelete { - source: predicate::predicate::Error, + source: predicate::delete_predicate::Error, input: String, }, #[snafu(display("Error building delete predicate {}: {}", input, source))] BuildingDeletePredicate { - source: predicate::predicate::Error, + source: predicate::delete_predicate::Error, input: String, }, diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 30b8d13678..6a5fa707dd 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -7,7 +7,7 @@ use data_types::chunk_metadata::ChunkId; use data_types::{server_id::ServerId, DatabaseName}; use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound}; use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; -use predicate::predicate::ParseDeletePredicate; +use predicate::delete_predicate::ParseDeletePredicate; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; use server::{ApplicationState, ConnectionManager, Error, Server};