Merge pull request #2715 from influxdata/crepererum/in_mem_expr_part2

refactor: introduce `DeletePredicate`
pull/24376/head
kodiakhq[bot] 2021-10-04 15:06:54 +00:00 committed by GitHub
commit 57ed72cd40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1059 additions and 937 deletions

View File

@ -6,7 +6,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::{ObjectStore, ObjectStoreApi};
use observability_deps::tracing::info;
use parking_lot::Mutex;
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use snafu::{ResultExt, Snafu};
use crate::catalog::{
@ -148,7 +148,7 @@ impl CatalogState for TracerCatalogState {
fn delete_predicate(
&mut self,
_predicate: Arc<Predicate>,
_predicate: Arc<DeletePredicate>,
_chunks: Vec<ChunkAddrWithoutDatabase>,
) {
// No need to track delete predicates, because the cleanup's job is to remove unreferenced parquet files. Delete

View File

@ -20,7 +20,7 @@ use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use object_store::{ObjectStore, ObjectStoreApi};
use observability_deps::tracing::{info, warn};
use parking_lot::RwLock;
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
collections::{
@ -147,11 +147,6 @@ pub enum Error {
source: crate::catalog::interface::CatalogStateRemoveError,
},
#[snafu(display("Cannot serialize predicate: {}", source))]
CannotSerializePredicate {
source: predicate::serialize::SerializeError,
},
#[snafu(display("Delete predicate missing"))]
DeletePredicateMissing,
@ -860,7 +855,7 @@ impl<'c> TransactionHandle<'c> {
/// Register new delete predicate.
pub fn delete_predicate(
&mut self,
predicate: &Predicate,
predicate: &DeletePredicate,
chunks: &[ChunkAddrWithoutDatabase],
) -> Result<()> {
self.transaction
@ -868,10 +863,7 @@ impl<'c> TransactionHandle<'c> {
.expect("transaction handle w/o transaction?!")
.record_action(proto::transaction::action::Action::DeletePredicate(
proto::DeletePredicate {
predicate: Some(
predicate::serialize::serialize(predicate)
.context(CannotSerializePredicate)?,
),
predicate: Some(predicate::serialize::serialize(predicate)),
chunks: chunks
.iter()
.map(|chunk| proto::ChunkAddr {
@ -992,16 +984,13 @@ impl<'c> CheckpointHandle<'c> {
}
fn create_actions_for_delete_predicates(
delete_predicates: Vec<(Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)>,
delete_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>,
) -> Result<Vec<proto::transaction::Action>, Error> {
delete_predicates
.into_iter()
.map(|(predicate, chunks)| {
let action = proto::DeletePredicate {
predicate: Some(
predicate::serialize::serialize(&predicate)
.context(CannotSerializePredicate)?,
),
predicate: Some(predicate::serialize::serialize(&predicate)),
chunks: chunks
.iter()
.map(|chunk| proto::ChunkAddr {

View File

@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use data_types::chunk_metadata::{ChunkAddr, ChunkId};
use iox_object_store::{IoxObjectStore, ParquetFilePath};
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use snafu::Snafu;
use crate::metadata::IoxParquetMetaData;
@ -118,7 +118,7 @@ pub trait CatalogState {
/// The delete predicate will only be applied to the given chunks (by table name, partition key, and chunk ID).
fn delete_predicate(
&mut self,
predicate: Arc<Predicate>,
predicate: Arc<DeletePredicate>,
chunks: Vec<ChunkAddrWithoutDatabase>,
);
}
@ -141,6 +141,6 @@ pub struct CheckpointData {
/// This must only contains chunks that are still present in the catalog. Predicates that do not have any chunks
/// attached should be left out.
///
/// The vector itself must be sorted by [`Predicate`]. The chunks list must also be sorted.
pub delete_predicates: Vec<(Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)>,
/// The vector itself must be sorted by [`DeletePredicate`]. The chunks list must also be sorted.
pub delete_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>,
}

View File

@ -7,9 +7,12 @@ use std::{
sync::Arc,
};
use data_types::chunk_metadata::ChunkId;
use data_types::{chunk_metadata::ChunkId, timestamp::TimestampRange};
use iox_object_store::{IoxObjectStore, ParquetFilePath, TransactionFilePath};
use predicate::predicate::Predicate;
use predicate::{
delete_expr::{DeleteExpr, Op, Scalar},
delete_predicate::DeletePredicate,
};
use snafu::ResultExt;
use crate::{
@ -41,7 +44,7 @@ pub struct Partition {
#[derive(Clone, Debug)]
pub struct Chunk {
pub parquet_info: CatalogParquetInfo,
pub delete_predicates: Vec<Arc<Predicate>>,
pub delete_predicates: Vec<Arc<DeletePredicate>>,
}
/// In-memory catalog state, for testing.
@ -74,16 +77,16 @@ impl TestCatalogState {
}
/// Return an iterator over all predicates in this catalog.
pub fn delete_predicates(&self) -> Vec<(Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)> {
let mut predicates: HashMap<usize, (Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)> =
pub fn delete_predicates(&self) -> Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> {
let mut predicates: HashMap<usize, (Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> =
Default::default();
for (table_name, table) in &self.tables {
for (partition_key, partition) in &table.partitions {
for (chunk_id, chunk) in &partition.chunks {
for predicate in &chunk.delete_predicates {
let predicate_ref: &Predicate = predicate.as_ref();
let addr = (predicate_ref as *const Predicate) as usize;
let predicate_ref: &DeletePredicate = predicate.as_ref();
let addr = (predicate_ref as *const DeletePredicate) as usize;
let pred_chunk_closure = || ChunkAddrWithoutDatabase {
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
@ -197,7 +200,7 @@ impl CatalogState for TestCatalogState {
fn delete_predicate(
&mut self,
predicate: Arc<Predicate>,
predicate: Arc<DeletePredicate>,
chunks: Vec<ChunkAddrWithoutDatabase>,
) {
for addr in chunks {
@ -256,7 +259,8 @@ where
// The expected state of the catalog
let mut expected_files: HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)> =
HashMap::new();
let mut expected_predicates: Vec<(Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)> = vec![];
let mut expected_predicates: Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)> =
vec![];
assert_checkpoint(&state, &f, &expected_files, &expected_predicates);
// add files
@ -591,7 +595,7 @@ fn assert_checkpoint<S, F>(
state: &S,
f: &F,
expected_files: &HashMap<ChunkId, (ParquetFilePath, Arc<IoxParquetMetaData>)>,
expected_predicates: &[(Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)],
expected_predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
) where
F: Fn(&S) -> CheckpointData,
{
@ -634,28 +638,20 @@ fn get_sorted_keys<'a>(
}
/// Helper to create a simple delete predicate.
pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc<Predicate> {
use datafusion::{
logical_plan::{Column, Expr, Operator},
scalar::ScalarValue,
};
Arc::new(Predicate {
pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc<DeletePredicate> {
Arc::new(DeletePredicate {
table_names: Some(
IntoIterator::into_iter([table_name.to_string(), format!("not_{}", table_name)])
.collect(),
),
field_columns: None,
partition_key: None,
range: None,
exprs: vec![Expr::BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(value)))),
}],
range: TimestampRange { start: 11, end: 22 },
exprs: vec![DeleteExpr::new(
"foo".to_string(),
Op::Eq,
Scalar::I64(value),
)],
})
}

View File

@ -23,6 +23,10 @@ pub struct DeleteExpr {
}
impl DeleteExpr {
pub fn new(column: String, op: Op, scalar: Scalar) -> Self {
Self { column, op, scalar }
}
/// Column (w/o table name).
pub fn column(&self) -> &str {
&self.column

View File

@ -0,0 +1,736 @@
use std::{collections::BTreeSet, convert::TryInto};
use chrono::DateTime;
use data_types::timestamp::TimestampRange;
use datafusion::logical_plan::{lit, Column, Expr, Operator};
use snafu::{ResultExt, Snafu};
use sqlparser::{
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
dialect::GenericDialect,
parser::Parser,
};
use crate::delete_expr::DeleteExpr;
const FLUX_TABLE: &str = "_measurement";
/// Parse Delete Predicates
/// Parse Error
#[derive(Debug, Snafu)]
pub enum Error {
/// Invalid time format
#[snafu(display("Invalid timestamp: {}", value))]
InvalidTimestamp { value: String },
/// Invalid time range
#[snafu(display("Invalid time range: ({}, {})", start, stop))]
InvalidTimeRange { start: String, stop: String },
/// Predicate syntax error
#[snafu(display("Invalid predicate syntax: ({})", value))]
InvalidSyntax { value: String },
/// Predicate semantics error
#[snafu(display("Invalid predicate semantics: ({})", value))]
InvalidSemantics { value: String },
/// Predicate include non supported expression
#[snafu(display("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_name != literal': ({})", value))]
NotSupportPredicate { value: String },
#[snafu(display(r#"Unable to parse delete string '{}'"#, value))]
DeleteInvalid {
source: serde_json::Error,
value: String,
},
#[snafu(display(
r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#,
value
))]
DeleteKeywordInvalid { value: String },
#[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))]
DeleteValueInvalid { value: String },
#[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))]
DeleteObjectInvalid { value: String },
#[snafu(display(r#"Invalid table name in delete '{}'"#, value))]
DeleteTableInvalid { value: String },
#[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))]
DeleteStartStopInvalid { value: String },
}
/// Result type for Parser Cient
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Represents a parsed delete predicate for evaluation by the InfluxDB IOx
/// query engine.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct DeletePredicate {
/// Optional table restriction. If present, restricts the results
/// to only tables whose names are in `table_names`
pub table_names: Option<BTreeSet<String>>,
/// Optional field restriction. If present, restricts the results to only
/// tables which have *at least one* of the fields in field_columns.
pub field_columns: Option<BTreeSet<String>>,
/// Optional partition key filter
pub partition_key: Option<String>,
/// Only rows within this range are included in
/// results. Other rows are excluded.
pub range: TimestampRange,
/// Optional arbitrary predicates, represented as list of
/// expressions applied a logical conjunction (aka they
/// are 'AND'ed together). Only rows that evaluate to TRUE for all
/// these expressions should be returned. Other rows are excluded
/// from the results.
pub exprs: Vec<DeleteExpr>,
}
impl From<DeletePredicate> for crate::predicate::Predicate {
fn from(pred: DeletePredicate) -> Self {
Self {
table_names: pred.table_names,
field_columns: pred.field_columns,
partition_key: pred.partition_key,
range: Some(pred.range),
exprs: pred.exprs.into_iter().map(|expr| expr.into()).collect(),
}
}
}
/// Parser for Delete predicate and time range
#[derive(Debug, Clone)]
pub struct ParseDeletePredicate {
pub start_time: i64,
pub stop_time: i64,
// conjunctive predicate of binary expressions of = or !=
pub predicate: Vec<DeleteExpr>,
}
impl ParseDeletePredicate {
/// Create a ParseDeletePredicate
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<DeleteExpr>) -> Self {
Self {
start_time,
stop_time,
predicate,
}
}
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
// parse and check time range
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
// Parse the predicate
let delete_exprs = Self::parse_predicate(predicate)?;
Ok(Self::new(start_time, stop_time, delete_exprs))
}
/// Parse the predicate and convert it into datafusion expression
/// A delete predicate is a conjunctive expression of many
/// binary expressions of 'colum = constant' or 'column != constant'
///
pub fn parse_predicate(predicate: &str) -> Result<Vec<DeleteExpr>> {
if predicate.is_empty() {
return Ok(vec![]);
}
// "DELETE FROM table_name WHERE predicate"
// Table name can be anything to have sqlparser work on the right sql syntax
let mut sql = "DELETE FROM table_name WHERE ".to_string();
sql.push_str(predicate);
// parse the delete sql
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql.as_str());
match ast {
Err(parse_err) => {
let error_str = format!("{}, {}", predicate, parse_err);
Err(Error::InvalidSyntax { value: error_str })
}
Ok(mut stmt) => {
if stmt.len() != 1 {
return Err(Error::InvalidSemantics {
value: predicate.to_string(),
});
}
let stmt = stmt.pop();
match stmt {
Some(Statement::Delete {
table_name: _,
selection: Some(expr),
..
}) => {
// split this expr into smaller binary if any
let mut exprs = vec![];
let split = Self::split_members(&expr, &mut exprs);
if !split {
return Err(Error::NotSupportPredicate {
value: predicate.to_string(),
});
}
Ok(exprs)
}
_ => Err(Error::InvalidSemantics {
value: predicate.to_string(),
}),
}
}
}
}
pub fn build_delete_predicate(
start_time: String,
stop_time: String,
predicate: String,
) -> Result<DeletePredicate, Error> {
// parse time range and the predicate
let parse_delete_pred = ParseDeletePredicate::try_new(
start_time.as_str(),
stop_time.as_str(),
predicate.as_str(),
)?;
Ok(parse_delete_pred.into())
}
/// Recursively split all "AND" expressions into smaller ones
/// Example: "A AND B AND C" => [A, B, C]
/// Return false if not all of them are AND of binary expression of
/// "column_name = literal" or "column_name != literal"
///
/// The split expressions will be converted into data fusion expressions
pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec<DeleteExpr>) -> bool {
// The below code built to be compatible with
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
match predicate {
SqlParserExpr::BinaryOp {
left,
op: BinaryOperator::And,
right,
} => {
if !Self::split_members(left, predicates) {
return false;
}
if !Self::split_members(right, predicates) {
return false;
}
}
SqlParserExpr::BinaryOp { left, op, right } => {
// Verify Operator
let op = match op {
BinaryOperator::Eq => Operator::Eq,
BinaryOperator::NotEq => Operator::NotEq,
_ => return false,
};
// verify if left is identifier (column name)
let column = match &**left {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _, // all quotes are ignored as done in idpe
}) => Expr::Column(Column {
relation: None,
name: value.to_string(),
}),
_ => return false, // not a column name
};
// verify if right is a literal or an identifier (e.g column name)
let value = match &**right {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _,
}) => lit(value.to_string()),
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
Ok(v) => lit(v),
Err(_) => lit(v.parse::<f64>().unwrap()),
},
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
_ => return false, // not a literal
};
let expr = Expr::BinaryExpr {
left: Box::new(column),
op,
right: Box::new(value),
};
let expr: Result<DeleteExpr, _> = expr.try_into();
match expr {
Ok(expr) => {
predicates.push(expr);
}
Err(_) => {
// cannot convert
return false;
}
}
}
_ => return false,
}
true
}
/// Parse a time and return its time in nanosecond
pub fn parse_time(input: &str) -> Result<i64> {
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
let datetime_result = DateTime::parse_from_rfc3339(input);
match datetime_result {
Ok(datetime) => Ok(datetime.timestamp_nanos()),
Err(timestamp_err) => {
// See if it is in nanosecond form
let time_result = input.parse::<i64>();
match time_result {
Ok(nano) => Ok(nano),
Err(nano_err) => {
// wrong format, return both error
let error_str = format!("{}, {}", timestamp_err, nano_err);
Err(Error::InvalidTimestamp { value: error_str })
}
}
}
}
}
/// Parse a time range [start, stop]
pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
let start_time = Self::parse_time(start)?;
let stop_time = Self::parse_time(stop)?;
if start_time > stop_time {
return Err(Error::InvalidTimeRange {
start: start.to_string(),
stop: stop.to_string(),
});
}
Ok((start_time, stop_time))
}
}
impl From<ParseDeletePredicate> for DeletePredicate {
fn from(pred: ParseDeletePredicate) -> Self {
Self {
table_names: None,
field_columns: None,
partition_key: None,
range: TimestampRange {
start: pred.start_time,
end: pred.stop_time,
},
exprs: pred.predicate,
}
}
}
// Note that this struct and its functions are used to parse FLUX DELETE,
// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before
// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is
// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the
// sql-format predicates and timestamps
#[derive(Debug, Default, PartialEq, Clone)]
/// data of a parsed delete
pub struct ParsedDelete {
/// Empty string, "", if no table specified
pub table_name: String,
pub start_time: String,
pub stop_time: String,
pub predicate: String,
}
/// Return parsed data of an influx delete:
/// A few input examples and their parsed results:
/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\"""
/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50"
pub fn parse_delete(input: &str) -> Result<ParsedDelete> {
let parsed_obj: serde_json::Value =
serde_json::from_str(input).context(DeleteInvalid { value: input })?;
let mut parsed_delete = ParsedDelete::default();
if let serde_json::Value::Object(items) = parsed_obj {
for item in items {
// The value must be type String
if let Some(val) = item.1.as_str() {
match item.0.to_lowercase().as_str() {
"start" => parsed_delete.start_time = val.to_string(),
"stop" => parsed_delete.stop_time = val.to_string(),
"predicate" => parsed_delete.predicate = val.to_string(),
_ => {
return Err(Error::DeleteKeywordInvalid {
value: input.to_string(),
})
}
}
} else {
return Err(Error::DeleteValueInvalid {
value: input.to_string(),
});
}
}
} else {
return Err(Error::DeleteObjectInvalid {
value: input.to_string(),
});
}
// Start or stop is empty
if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() {
return Err(Error::DeleteStartStopInvalid {
value: input.to_string(),
});
}
// Extract table from the predicate if any
if parsed_delete.predicate.contains(FLUX_TABLE) {
// since predicate is a conjunctive expression, split them by "and"
let predicate = parsed_delete
.predicate
.replace(" AND ", " and ")
.replace(" ANd ", " and ")
.replace(" And ", " and ")
.replace(" AnD ", " and ");
let split: Vec<&str> = predicate.split("and").collect();
let mut predicate_no_table = "".to_string();
for s in split {
if s.contains(FLUX_TABLE) {
// This should be in form "_measurement = <your_table_name>"
// only <keep your_table_name> by replacing the rest with ""
let table_name = s
.replace(FLUX_TABLE, "")
.replace("=", "")
.trim()
.to_string();
// Do not support white spaces in table name
if table_name.contains(' ') {
return Err(Error::DeleteTableInvalid {
value: input.to_string(),
});
}
parsed_delete.table_name = table_name;
} else {
// This is a normal column comparison, put it back to send to sqlparser later
if !predicate_no_table.is_empty() {
predicate_no_table.push_str(" and ")
}
predicate_no_table.push_str(s.trim());
}
}
parsed_delete.predicate = predicate_no_table;
}
Ok(parsed_delete)
}
#[cfg(test)]
mod tests {
use crate::delete_expr::{Op, Scalar};
use super::*;
#[test]
fn test_time_range_valid() {
let start = r#"100"#;
let stop = r#"100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 100);
assert_eq!(result, expected);
let start = r#"100"#;
let stop = r#"200"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 200);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 0);
assert_eq!(result, expected);
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
// let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
// let expected = (0, 0);
// assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 100);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"1970-01-01T00:01:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 60000000000);
assert_eq!(result, expected);
}
#[test]
fn test_time_range_invalid() {
let start = r#"100"#;
let stop = r#"-100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"50"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"1971-09-01T00:00:10Z"#;
let stop = r#"1971-09-01T00:00:05Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
}
#[test]
fn test_parse_timestamp() {
let input = r#"123"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 123);
// must parse time
let input = r#"1970-01-01T00:00:00Z"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 0);
let input = r#"1971-02-01T15:30:21Z"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 34270221000000000);
}
#[test]
fn test_parse_timestamp_negative() {
let input = r#"-123"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, -123);
}
#[test]
fn test_parse_timestamp_invalid() {
let input = r#"123gdb"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
let input = r#"1970-01-01T00:00:00"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
// It turn out this is not invalid but return1 1971
let input = r#"1971-02-01:30:21Z"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
}
#[test]
fn test_parse_timestamp_out_of_range() {
let input = r#"99999999999999999999999999999999"#;
let time = ParseDeletePredicate::parse_time(input);
assert!(time.is_err());
}
#[test]
fn test_parse_predicate() {
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
let result = ParseDeletePredicate::parse_predicate(pred).unwrap();
println!("{:#?}", result);
let expected = vec![
DeleteExpr::new(
"city".to_string(),
Op::Eq,
Scalar::String("Boston".to_string()),
),
DeleteExpr::new("cost".to_string(), Op::Ne, Scalar::I64(100)),
DeleteExpr::new(
"state".to_string(),
Op::Ne,
Scalar::String("MA".to_string()),
),
DeleteExpr::new("temp".to_string(), Op::Eq, Scalar::F64((87.5).into())),
];
assert_eq!(result, expected)
}
#[test]
fn test_parse_predicate_invalid() {
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost > 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost <= 100"#; // <
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost gt 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"city = cost = 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred() {
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"200"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap();
assert_eq!(result.start_time, 0);
assert_eq!(result.stop_time, 200);
let expected = vec![DeleteExpr::new(
"cost".to_string(),
Op::Ne,
Scalar::I64(100),
)];
assert_eq!(result.predicate, expected);
}
#[test]
fn test_full_delete_pred_invalid_time_range() {
let start = r#"100"#;
let stop = r#"50"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred_invalid_pred() {
let start = r#"100"#;
let stop = r#"200"#;
let pred = r#"cost > 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred);
assert!(result.is_err());
}
#[test]
fn test_parse_delete_full() {
let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "mytable".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_no_table() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_empty_predicate() {
let delete_str =
r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_no_predicate() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
// NGA todo: check content of error messages
#[test]
fn test_parse_delete_negative() {
// invalid key
let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid key which is either 'start', 'stop', or 'predicate'"));
// invalid timestamp value
let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid timestamp or predicate value"));
// invalid JSON
let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err.to_string().contains("Unable to parse delete string"));
}
}

View File

@ -1,4 +1,5 @@
pub mod delete_expr;
pub mod delete_predicate;
pub mod predicate;
pub mod regex;
pub mod serialize;

View File

@ -11,75 +11,12 @@ use std::{
use data_types::timestamp::TimestampRange;
use datafusion::{
error::DataFusionError,
logical_plan::{col, lit, lit_timestamp_nano, Column, Expr, Operator},
logical_plan::{col, lit_timestamp_nano, Expr, Operator},
optimizer::utils,
};
use datafusion_util::{make_range_expr, AndExprBuilder};
use internal_types::schema::TIME_COLUMN_NAME;
use observability_deps::tracing::debug;
use sqlparser::{
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
dialect::GenericDialect,
parser::Parser,
};
use snafu::{ResultExt, Snafu};
use chrono::DateTime;
const FLUX_TABLE: &str = "_measurement";
// Parse Delete Predicates
/// Parse Error
#[derive(Debug, Snafu)]
pub enum Error {
/// Invalid time format
#[snafu(display("Invalid timestamp: {}", value))]
InvalidTimestamp { value: String },
/// Invalid time range
#[snafu(display("Invalid time range: ({}, {})", start, stop))]
InvalidTimeRange { start: String, stop: String },
/// Predicate syntax error
#[snafu(display("Invalid predicate syntax: ({})", value))]
InvalidSyntax { value: String },
/// Predicate semantics error
#[snafu(display("Invalid predicate semantics: ({})", value))]
InvalidSemantics { value: String },
/// Predicate include non supported expression
#[snafu(display("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", value))]
NotSupportPredicate { value: String },
#[snafu(display(r#"Unable to parse delete string '{}'"#, value))]
DeleteInvalid {
source: serde_json::Error,
value: String,
},
#[snafu(display(
r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#,
value
))]
DeleteKeywordInvalid { value: String },
#[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))]
DeleteValueInvalid { value: String },
#[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))]
DeleteObjectInvalid { value: String },
#[snafu(display(r#"Invalid table name in delete '{}'"#, value))]
DeleteTableInvalid { value: String },
#[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))]
DeleteStartStopInvalid { value: String },
}
/// Result type for Parser Cient
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// This `Predicate` represents the empty predicate (aka that
/// evaluates to true for all rows).
@ -532,332 +469,6 @@ impl PredicateBuilder {
}
}
/// Parser for Delete predicate and time range
#[derive(Debug, Clone)]
pub struct ParseDeletePredicate {
pub start_time: i64,
pub stop_time: i64,
// conjunctive predicate of binary expressions of = or !=
pub predicate: Vec<Expr>,
}
impl ParseDeletePredicate {
/// Create a ParseDeletePredicate
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<Expr>) -> Self {
Self {
start_time,
stop_time,
predicate,
}
}
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
// parse and check time range
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
// Parse the predicate
let delete_exprs = Self::parse_predicate(predicate)?;
Ok(Self::new(start_time, stop_time, delete_exprs))
}
/// Parse the predicate and convert it into datafusion expression
/// A delete predicate is a conjunctive expression of many
/// binary expressions of 'colum = constant' or 'column != constant'
///
pub fn parse_predicate(predicate: &str) -> Result<Vec<Expr>> {
if predicate.is_empty() {
return Ok(vec![]);
}
// "DELETE FROM table_name WHERE predicate"
// Table name can be anything to have sqlparser work on the right sql syntax
let mut sql = "DELETE FROM table_name WHERE ".to_string();
sql.push_str(predicate);
// parse the delete sql
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql.as_str());
match ast {
Err(parse_err) => {
let error_str = format!("{}, {}", predicate, parse_err);
Err(Error::InvalidSyntax { value: error_str })
}
Ok(mut stmt) => {
if stmt.len() != 1 {
return Err(Error::InvalidSemantics {
value: predicate.to_string(),
});
}
let stmt = stmt.pop();
match stmt {
Some(Statement::Delete {
table_name: _,
selection: Some(expr),
..
}) => {
// split this expr into smaller binary if any
let mut exprs = vec![];
let split = Self::split_members(&expr, &mut exprs);
if !split {
return Err(Error::NotSupportPredicate {
value: predicate.to_string(),
});
}
Ok(exprs)
}
_ => Err(Error::InvalidSemantics {
value: predicate.to_string(),
}),
}
}
}
}
pub fn build_delete_predicate(
start_time: String,
stop_time: String,
predicate: String,
) -> Result<Predicate, Error> {
// parse time range and the predicate
let parse_delete_pred = ParseDeletePredicate::try_new(
start_time.as_str(),
stop_time.as_str(),
predicate.as_str(),
)?;
let mut del_predicate = PredicateBuilder::new()
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time)
.build();
// Add the predicate binary expressions
for expr in parse_delete_pred.predicate {
del_predicate.exprs.push(expr);
}
Ok(del_predicate)
}
/// Recursively split all "AND" expressions into smaller ones
/// Example: "A AND B AND C" => [A, B, C]
/// Return false if not all of them are AND of binary expression of
/// "column_name = literal" or "column_name != literal"
///
/// The split expressions will be converted into data fusion expressions
pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec<Expr>) -> bool {
// The below code built to be compatible with
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
match predicate {
SqlParserExpr::BinaryOp {
left,
op: BinaryOperator::And,
right,
} => {
if !Self::split_members(left, predicates) {
return false;
}
if !Self::split_members(right, predicates) {
return false;
}
}
SqlParserExpr::BinaryOp { left, op, right } => {
// Verify Operator
let op = match op {
BinaryOperator::Eq => Operator::Eq,
BinaryOperator::NotEq => Operator::NotEq,
_ => return false,
};
// verify if left is identifier (column name)
let column = match &**left {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _, // all quotes are ignored as done in idpe
}) => Expr::Column(Column {
relation: None,
name: value.to_string(),
}),
_ => return false, // not a column name
};
// verify if right is a literal or an identifier (e.g column name)
let value = match &**right {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _,
}) => lit(value.to_string()),
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
lit(value.to_string())
}
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
Ok(v) => lit(v),
Err(_) => lit(v.parse::<f64>().unwrap()),
},
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
_ => return false, // not a literal
};
let expr = Expr::BinaryExpr {
left: Box::new(column),
op,
right: Box::new(value),
};
predicates.push(expr);
}
_ => return false,
}
true
}
/// Parse a time and return its time in nanosecond
pub fn parse_time(input: &str) -> Result<i64> {
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
let datetime_result = DateTime::parse_from_rfc3339(input);
match datetime_result {
Ok(datetime) => Ok(datetime.timestamp_nanos()),
Err(timestamp_err) => {
// See if it is in nanosecond form
let time_result = input.parse::<i64>();
match time_result {
Ok(nano) => Ok(nano),
Err(nano_err) => {
// wrong format, return both error
let error_str = format!("{}, {}", timestamp_err, nano_err);
Err(Error::InvalidTimestamp { value: error_str })
}
}
}
}
}
/// Parse a time range [start, stop]
pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
let start_time = Self::parse_time(start)?;
let stop_time = Self::parse_time(stop)?;
if start_time > stop_time {
return Err(Error::InvalidTimeRange {
start: start.to_string(),
stop: stop.to_string(),
});
}
Ok((start_time, stop_time))
}
}
// Note that this struct and its functions are used to parse FLUX DELETE,
// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before
// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is
// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the
// sql-format predicates and timestamps
#[derive(Debug, Default, PartialEq, Clone)]
/// data of a parsed delete
pub struct ParsedDelete {
/// Empty string, "", if no table specified
pub table_name: String,
pub start_time: String,
pub stop_time: String,
pub predicate: String,
}
/// Return parsed data of an influx delete:
/// A few input examples and their parsed results:
/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\"""
/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50"
pub fn parse_delete(input: &str) -> Result<ParsedDelete> {
let parsed_obj: serde_json::Value =
serde_json::from_str(input).context(DeleteInvalid { value: input })?;
let mut parsed_delete = ParsedDelete::default();
if let serde_json::Value::Object(items) = parsed_obj {
for item in items {
// The value must be type String
if let Some(val) = item.1.as_str() {
match item.0.to_lowercase().as_str() {
"start" => parsed_delete.start_time = val.to_string(),
"stop" => parsed_delete.stop_time = val.to_string(),
"predicate" => parsed_delete.predicate = val.to_string(),
_ => {
return Err(Error::DeleteKeywordInvalid {
value: input.to_string(),
})
}
}
} else {
return Err(Error::DeleteValueInvalid {
value: input.to_string(),
});
}
}
} else {
return Err(Error::DeleteObjectInvalid {
value: input.to_string(),
});
}
// Start or stop is empty
if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() {
return Err(Error::DeleteStartStopInvalid {
value: input.to_string(),
});
}
// Extract table from the predicate if any
if parsed_delete.predicate.contains(FLUX_TABLE) {
// since predicate is a conjunctive expression, split them by "and"
let predicate = parsed_delete
.predicate
.replace(" AND ", " and ")
.replace(" ANd ", " and ")
.replace(" And ", " and ")
.replace(" AnD ", " and ");
let split: Vec<&str> = predicate.split("and").collect();
let mut predicate_no_table = "".to_string();
for s in split {
if s.contains(FLUX_TABLE) {
// This should be in form "_measurement = <your_table_name>"
// only <keep your_table_name> by replacing the rest with ""
let table_name = s
.replace(FLUX_TABLE, "")
.replace("=", "")
.trim()
.to_string();
// Do not support white spaces in table name
if table_name.contains(' ') {
return Err(Error::DeleteTableInvalid {
value: input.to_string(),
});
}
parsed_delete.table_name = table_name;
} else {
// This is a normal column comparison, put it back to send to sqlparser later
if !predicate_no_table.is_empty() {
predicate_no_table.push_str(" and ")
}
predicate_no_table.push_str(s.trim());
}
}
parsed_delete.predicate = predicate_no_table;
}
Ok(parsed_delete)
}
#[cfg(test)]
mod tests {
use super::*;
@ -999,282 +610,4 @@ mod tests {
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo = Int32(42)]");
}
// The delete predicate
#[test]
fn test_time_range_valid() {
let start = r#"100"#;
let stop = r#"100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 100);
assert_eq!(result, expected);
let start = r#"100"#;
let stop = r#"200"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 200);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 0);
assert_eq!(result, expected);
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
// let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
// let expected = (0, 0);
// assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 100);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"1970-01-01T00:01:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 60000000000);
assert_eq!(result, expected);
}
#[test]
fn test_time_range_invalid() {
let start = r#"100"#;
let stop = r#"-100"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"50"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"1971-09-01T00:00:10Z"#;
let stop = r#"1971-09-01T00:00:05Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
}
#[test]
fn test_parse_timestamp() {
let input = r#"123"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 123);
// must parse time
let input = r#"1970-01-01T00:00:00Z"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 0);
let input = r#"1971-02-01T15:30:21Z"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 34270221000000000);
}
#[test]
fn test_parse_timestamp_negative() {
let input = r#"-123"#;
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, -123);
}
#[test]
fn test_parse_timestamp_invalid() {
let input = r#"123gdb"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
let input = r#"1970-01-01T00:00:00"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
// It turn out this is not invalid but return1 1971
let input = r#"1971-02-01:30:21Z"#;
ParseDeletePredicate::parse_time(input).unwrap_err();
}
#[test]
fn test_parse_timestamp_out_of_range() {
let input = r#"99999999999999999999999999999999"#;
let time = ParseDeletePredicate::parse_time(input);
assert!(time.is_err());
}
#[test]
fn test_parse_predicate() {
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
let result = ParseDeletePredicate::parse_predicate(pred).unwrap();
println!("{:#?}", result);
let mut expected = vec![];
let e = col("city").eq(lit("Boston"));
expected.push(e);
let val: i64 = 100;
let e = col("cost").not_eq(lit(val));
expected.push(e);
let e = col("state").not_eq(lit("MA"));
expected.push(e);
let e = col("temp").eq(lit(87.5));
expected.push(e);
assert_eq!(result, expected)
}
#[test]
fn test_parse_predicate_invalid() {
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost > 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost <= 100"#; // <
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"cost gt 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
let pred = r#"city = cost = 100"#; // >
let result = ParseDeletePredicate::parse_predicate(pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred() {
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"200"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap();
assert_eq!(result.start_time, 0);
assert_eq!(result.stop_time, 200);
let mut expected = vec![];
let num: i64 = 100;
let e = col("cost").not_eq(lit(num));
expected.push(e);
assert_eq!(result.predicate, expected);
}
#[test]
fn test_full_delete_pred_invalid_time_range() {
let start = r#"100"#;
let stop = r#"50"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred_invalid_pred() {
let start = r#"100"#;
let stop = r#"200"#;
let pred = r#"cost > 100"#;
let result = ParseDeletePredicate::try_new(start, stop, pred);
assert!(result.is_err());
}
#[test]
fn test_parse_delete_full() {
let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "mytable".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_no_table() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "host=\"Orient.local\"".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_empty_predicate() {
let delete_str =
r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
#[test]
fn test_parse_delete_no_predicate() {
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let expected = ParsedDelete {
table_name: "".to_string(),
predicate: "".to_string(),
start_time: "1970-01-01T00:00:00Z".to_string(),
stop_time: "2070-01-02T00:00:00Z".to_string(),
};
let result = parse_delete(delete_str).unwrap();
assert_eq!(expected, result);
}
// NGA todo: check content of error messages
#[test]
fn test_parse_delete_negative() {
// invalid key
let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid key which is either 'start', 'stop', or 'predicate'"));
// invalid timestamp value
let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err
.to_string()
.contains("Invalid timestamp or predicate value"));
// invalid JSON
let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#;
let result = parse_delete(delete_str);
let err = result.unwrap_err();
assert!(err.to_string().contains("Unable to parse delete string"));
}
}

View File

@ -12,36 +12,24 @@ use data_types::timestamp::TimestampRange;
use generated_types::influxdata::iox::catalog::v1 as proto;
use snafu::{ResultExt, Snafu};
use crate::{delete_expr::DeleteExpr, predicate::Predicate};
use crate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
#[derive(Debug, Snafu)]
pub enum SerializeError {
#[snafu(display("cannot convert datafusion expr: {}", source))]
CannotConvertDataFusionExpr {
source: crate::delete_expr::DataFusionToExprError,
},
}
/// Serialize IOx [`Predicate`] to a protobuf object.
pub fn serialize(predicate: &Predicate) -> Result<proto::Predicate, SerializeError> {
let proto_predicate = proto::Predicate {
/// Serialize IOx [`DeletePredicate`] to a protobuf object.
pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate {
proto::Predicate {
table_names: serialize_optional_string_set(&predicate.table_names),
field_columns: serialize_optional_string_set(&predicate.field_columns),
partition_key: serialize_optional_string(&predicate.partition_key),
range: serialize_timestamp_range(&predicate.range),
range: Some(proto::TimestampRange {
start: predicate.range.start,
end: predicate.range.end,
}),
exprs: predicate
.exprs
.iter()
.map(|expr| {
let expr: DeleteExpr = expr
.clone()
.try_into()
.context(CannotConvertDataFusionExpr)?;
Ok(expr.into())
})
.collect::<Result<Vec<proto::Expr>, SerializeError>>()?,
};
Ok(proto_predicate)
.map(|expr| expr.clone().into())
.collect(),
}
}
fn serialize_optional_string_set(
@ -57,36 +45,41 @@ fn serialize_optional_string(s: &Option<String>) -> Option<proto::OptionalString
.map(|s| proto::OptionalString { value: s.clone() })
}
fn serialize_timestamp_range(r: &Option<TimestampRange>) -> Option<proto::TimestampRange> {
r.as_ref().map(|r| proto::TimestampRange {
start: r.start,
end: r.end,
})
}
#[derive(Debug, Snafu)]
pub enum DeserializeError {
#[snafu(display("timestamp range is missing"))]
RangeMissing,
#[snafu(display("cannot deserialize expr: {}", source))]
CannotDeserializeExpr {
source: crate::delete_expr::ProtoToExprError,
},
}
/// Deserialize IOx [`Predicate`] from a protobuf object.
pub fn deserialize(proto_predicate: &proto::Predicate) -> Result<Predicate, DeserializeError> {
let predicate = Predicate {
/// Deserialize IOx [`DeletePredicate`] from a protobuf object.
pub fn deserialize(
proto_predicate: &proto::Predicate,
) -> Result<DeletePredicate, DeserializeError> {
let predicate = DeletePredicate {
table_names: deserialize_optional_string_set(&proto_predicate.table_names),
field_columns: deserialize_optional_string_set(&proto_predicate.field_columns),
partition_key: deserialize_optional_string(&proto_predicate.partition_key),
range: deserialize_timestamp_range(&proto_predicate.range),
range: proto_predicate
.range
.as_ref()
.map(|r| TimestampRange {
start: r.start,
end: r.end,
})
.ok_or(DeserializeError::RangeMissing)?,
exprs: proto_predicate
.exprs
.iter()
.map(|expr| {
let expr: DeleteExpr = expr.clone().try_into().context(CannotDeserializeExpr)?;
Ok(expr.into())
Ok(expr)
})
.collect::<Result<Vec<datafusion::logical_plan::Expr>, DeserializeError>>()?,
.collect::<Result<Vec<DeleteExpr>, DeserializeError>>()?,
};
Ok(predicate)
}
@ -101,16 +94,9 @@ fn deserialize_optional_string(s: &Option<proto::OptionalString>) -> Option<Stri
s.as_ref().map(|s| s.value.clone())
}
fn deserialize_timestamp_range(r: &Option<proto::TimestampRange>) -> Option<TimestampRange> {
r.as_ref().map(|r| TimestampRange {
start: r.start,
end: r.end,
})
}
#[cfg(test)]
mod tests {
use crate::predicate::{ParseDeletePredicate, PredicateBuilder};
use crate::delete_predicate::ParseDeletePredicate;
use super::*;
@ -118,12 +104,12 @@ mod tests {
fn test_roundtrip() {
let table_name = "my_table";
let predicate = delete_predicate(table_name);
let proto = serialize(&predicate).unwrap();
let proto = serialize(&predicate);
let recovered = deserialize(&proto).unwrap();
assert_eq!(predicate, recovered);
}
fn delete_predicate(table_name: &str) -> Predicate {
fn delete_predicate(table_name: &str) -> DeletePredicate {
let start_time = "11";
let stop_time = "22";
let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#;
@ -131,14 +117,8 @@ mod tests {
let parse_delete_pred =
ParseDeletePredicate::try_new(start_time, stop_time, predicate).unwrap();
let mut del_predicate_builder = PredicateBuilder::new()
.table(table_name)
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time);
for expr in parse_delete_pred.predicate {
del_predicate_builder = del_predicate_builder.add_expr(expr);
}
del_predicate_builder.build()
let mut pred: DeletePredicate = parse_delete_pred.into();
pred.table_names = Some(IntoIterator::into_iter([table_name.to_string()]).collect());
pred
}
}

View File

@ -19,7 +19,10 @@ use internal_types::{
selection::Selection,
};
use observability_deps::tracing::{debug, trace};
use predicate::predicate::{Predicate, PredicateMatch};
use predicate::{
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateMatch},
};
use hashbrown::HashMap;
use std::{fmt::Debug, sync::Arc};
@ -46,7 +49,7 @@ pub trait QueryChunkMeta: Sized {
fn schema(&self) -> Arc<Schema>;
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<Predicate>];
fn delete_predicates(&self) -> &[Arc<DeletePredicate>];
}
/// A `Database` is the main trait implemented by the IOx subsystems
@ -166,7 +169,7 @@ where
self.as_ref().schema()
}
fn delete_predicates(&self) -> &[Arc<Predicate>] {
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
pred

View File

@ -774,8 +774,12 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// Add Filter operator, FilterExec, if the chunk has delete predicates
let del_preds = chunk.delete_predicates();
let del_preds: Vec<Arc<Predicate>> = del_preds
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
debug!(?del_preds, "Chunk delete predicates");
let negated_del_expr_val = Predicate::negated_expr(del_preds);
let negated_del_expr_val = Predicate::negated_expr(&del_preds[..]);
if let Some(negated_del_expr) = negated_del_expr_val {
debug!(?negated_del_expr, "Logical negated expressions");

View File

@ -117,9 +117,13 @@ impl<C: QueryChunk + 'static> ExecutionPlan for IOxReadFilterNode<C> {
let selection = Selection::Some(&selection_cols);
let del_preds = chunk.delete_predicates();
let del_preds: Vec<Arc<Predicate>> = del_preds
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
let stream = chunk
.read_filter(&self.predicate, selection, del_preds)
.read_filter(&self.predicate, selection, &del_preds)
.map_err(|e| {
DataFusionError::Execution(format!(
"Error creating scan for table {} chunk {}: {}",

View File

@ -27,6 +27,7 @@ use internal_types::{
};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::delete_predicate::DeletePredicate;
use snafu::Snafu;
use std::num::NonZeroU64;
use std::{collections::BTreeMap, fmt, sync::Arc};
@ -175,7 +176,7 @@ pub struct TestChunk {
predicate_match: Option<PredicateMatch>,
/// Copy of delete predicates passed
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
/// Order of this chunk relative to other overlapping chunks.
order: ChunkOrder,
@ -914,7 +915,7 @@ impl QueryChunkMeta for TestChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<Predicate>] {
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = &self.delete_predicates;
debug!(?pred, "Delete predicate in Test Chunk");

View File

@ -1,8 +1,9 @@
//! This module contains testing scenarios for Delete
use data_types::chunk_metadata::ChunkId;
use datafusion::logical_plan::{col, lit};
use predicate::predicate::{Predicate, PredicateBuilder};
use data_types::timestamp::TimestampRange;
use predicate::delete_expr::DeleteExpr;
use predicate::delete_predicate::DeletePredicate;
use async_trait::async_trait;
use query::QueryChunk;
@ -32,10 +33,13 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"];
// delete predicate
let pred = PredicateBuilder::new()
.table(table_name)
.timestamp_range(0, 25)
.build();
let pred = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 25 },
exprs: vec![],
};
// this returns 15 scenarios
all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key)
@ -56,12 +60,17 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"];
// delete predicate
let expr = col("bar").eq(lit(1f64));
let pred = PredicateBuilder::new()
.table(table_name)
.timestamp_range(0, 15)
.add_expr(expr)
.build();
let pred = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 15 },
exprs: vec![DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::F64((1.0).into()),
)],
};
// this returns 15 scenarios
all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key)
@ -85,14 +94,24 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
"cpu,foo=me bar=1 40",
];
// delete predicate
let expr1 = col("bar").eq(lit(1f64));
let expr2 = col("foo").eq(lit("me"));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
let pred = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 32 },
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String("me".to_string()),
),
],
};
// this returns 15 scenarios
all_delete_scenarios_for_one_chunk(vec![&pred], vec![], lp_lines, table_name, partition_key)
@ -122,22 +141,37 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
];
// delete predicate
// pred1: delete from cpu where 0 <= time < 32 and bar = 1 and foo = 'me'
let expr1 = col("bar").eq(lit(1f64));
let expr2 = col("foo").eq(lit("me"));
let pred1 = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
let pred1 = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 32 },
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String("me".to_string()),
),
],
};
// pred2: delete from cpu where 10 <= time < 45 and bar != 1
let expr3 = col("bar").not_eq(lit(1f64));
let pred2 = PredicateBuilder::new()
.table("cpu")
.timestamp_range(10, 45)
.add_expr(expr3)
.build();
let pred2 = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 10, end: 45 },
exprs: vec![DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Ne,
predicate::delete_expr::Scalar::F64((1.0).into()),
)],
};
// build scenarios
all_delete_scenarios_for_one_chunk(
@ -171,14 +205,24 @@ impl DbSetup for ThreeDeleteThreeChunks {
];
// delete predicate on chunk 1
//let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(1f64));
let expr2 = col("foo").eq(lit("me"));
let pred1 = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
let pred1 = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 32 },
exprs: vec![
DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::F64((1.0).into()),
),
DeleteExpr::new(
"foo".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String("me".to_string()),
),
],
};
//chunk 2 data
let lp_lines_2 = vec![
@ -188,12 +232,17 @@ impl DbSetup for ThreeDeleteThreeChunks {
"cpu,foo=me bar=5 60",
];
// delete predicate on chunk 1 & chunk 2
let expr = col("foo").eq(lit("you"));
let pred2 = PredicateBuilder::new()
.table("cpu")
.timestamp_range(20, 45)
.add_expr(expr)
.build();
let pred2 = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 20, end: 45 },
exprs: vec![DeleteExpr::new(
"foo".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String("you".to_string()),
)],
};
// chunk 3 data
let lp_lines_3 = vec![
@ -203,13 +252,17 @@ impl DbSetup for ThreeDeleteThreeChunks {
"cpu,foo=me bar=8 90", // deleted by pred3
];
// delete predicate on chunk 3
let i: f64 = 7.0;
let expr = col("bar").not_eq(lit(i));
let pred3 = PredicateBuilder::new()
.table("cpu")
.timestamp_range(75, 95)
.add_expr(expr)
.build();
let pred3 = DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 75, end: 95 },
exprs: vec![DeleteExpr::new(
"bar".to_string(),
predicate::delete_expr::Op::Ne,
predicate::delete_expr::Scalar::F64((7.0).into()),
)],
};
// ----------------------
// 3 chunks: MUB, RUB, OS
@ -412,7 +465,7 @@ impl ChunkStage {
#[derive(Debug, Clone)]
pub struct Pred<'a> {
/// Delete predicate
predicate: &'a Predicate,
predicate: &'a DeletePredicate,
/// At which chunk stage this predicate is applied
delete_time: DeleteTime,
}
@ -464,9 +517,9 @@ impl DeleteTime {
/// Exhaust tests of chunk stages and their life cycle moves for given set of delete predicates
async fn all_delete_scenarios_for_one_chunk(
// These delete predicates are applied at all stages of the chunk life cycle
chunk_stage_preds: Vec<&Predicate>,
chunk_stage_preds: Vec<&DeletePredicate>,
// These delete predicates are applied all chunks at their final stages
at_end_preds: Vec<&Predicate>,
at_end_preds: Vec<&DeletePredicate>,
// Single chunk data
lp_lines: Vec<&str>,
// Table of the chunk
@ -677,7 +730,7 @@ async fn make_chunk_with_deletes_at_different_stages(
// function will created a much more complicated cases to handle
async fn make_different_stage_chunks_with_deletes_scenario(
data: Vec<ChunkData<'_>>,
preds: Vec<&Predicate>,
preds: Vec<&DeletePredicate>,
table_name: &str,
partition_key: &str,
) -> DbScenario {

View File

@ -39,7 +39,7 @@ use parquet_file::catalog::{
prune::prune_history as prune_catalog_transaction_history,
};
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use predicate::predicate::Predicate;
use predicate::{delete_predicate::DeletePredicate, predicate::Predicate};
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
QueryDatabase,
@ -546,7 +546,7 @@ impl Db {
pub async fn delete(
self: &Arc<Self>,
table_name: &str,
delete_predicate: Arc<Predicate>,
delete_predicate: Arc<DeletePredicate>,
) -> Result<()> {
// collect delete predicates on preserved partitions for a catalog transaction
let mut affected_persisted_chunks = vec![];
@ -1263,8 +1263,10 @@ impl CatalogProvider for Db {
pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
let mut files = HashMap::new();
let mut delete_predicates: HashMap<usize, (Arc<Predicate>, Vec<ChunkAddrWithoutDatabase>)> =
Default::default();
let mut delete_predicates: HashMap<
usize,
(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>),
> = Default::default();
for chunk in catalog.chunks() {
let guard = chunk.read();
@ -1288,8 +1290,8 @@ pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData
|| guard.is_in_lifecycle(ChunkLifecycleAction::Persisting)
{
for predicate in guard.delete_predicates() {
let predicate_ref: &Predicate = predicate.as_ref();
let addr = (predicate_ref as *const Predicate) as usize;
let predicate_ref: &DeletePredicate = predicate.as_ref();
let addr = (predicate_ref as *const DeletePredicate) as usize;
delete_predicates
.entry(addr)
.and_modify(|(_predicate, v)| v.push(guard.addr().clone().into()))
@ -1415,10 +1417,8 @@ mod tests {
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use chrono::{DateTime, TimeZone};
use datafusion::logical_plan::{col, lit};
use futures::{stream, StreamExt, TryStreamExt};
use predicate::predicate::PredicateBuilder;
use predicate::delete_expr::DeleteExpr;
use tokio_util::sync::CancellationToken;
use ::test_helpers::{assert_contains, maybe_start_logging};
@ -1427,6 +1427,7 @@ mod tests {
chunk_metadata::{ChunkAddr, ChunkStorage},
database_rules::{LifecycleRules, PartitionTemplate, TemplatePart},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
timestamp::TimestampRange,
write_summary::TimestampSummary,
};
use entry::test_helpers::lp_to_entry;
@ -3685,14 +3686,20 @@ mod tests {
.unwrap();
// ==================== do: delete ====================
let expr = col("selector").eq(lit(1i64));
let pred = Arc::new(
PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 1_000)
.add_expr(expr)
.build(),
);
let pred = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange {
start: 0,
end: 1_000,
},
exprs: vec![DeleteExpr::new(
"selector".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::I64(1),
)],
});
db.delete("cpu", Arc::clone(&pred)).await.unwrap();
// ==================== do: preserve another partition ====================

View File

@ -16,7 +16,7 @@ use internal_types::{access::AccessRecorder, schema::Schema};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use read_buffer::RBChunk;
use tracker::{TaskRegistration, TaskTracker};
@ -80,7 +80,7 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
/// Delete predicates of this chunk
pub delete_predicates: Vec<Arc<Predicate>>,
pub delete_predicates: Vec<Arc<DeletePredicate>>,
}
/// Different memory representations of a frozen chunk.
@ -311,7 +311,7 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
metrics: ChunkMetrics,
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
order: ChunkOrder,
) -> Self {
let stage = ChunkStage::Frozen {
@ -346,7 +346,7 @@ impl CatalogChunk {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics,
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
order: ChunkOrder,
) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -473,7 +473,7 @@ impl CatalogChunk {
}
}
pub fn add_delete_predicate(&mut self, delete_predicate: Arc<Predicate>) {
pub fn add_delete_predicate(&mut self, delete_predicate: Arc<DeletePredicate>) {
debug!(
?delete_predicate,
"Input delete predicate to CatalogChunk add_delete_predicate"
@ -498,7 +498,7 @@ impl CatalogChunk {
}
}
pub fn delete_predicates(&self) -> &[Arc<Predicate>] {
pub fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
match &self.stage {
ChunkStage::Open { mb_chunk: _ } => {
// no delete predicate for open chunk
@ -686,13 +686,13 @@ impl CatalogChunk {
///
/// This only works for chunks in the _open_ stage (chunk is converted) and the _frozen_ stage
/// (no-op) and will fail for other stages.
pub fn freeze_with_predicate(&mut self, delete_predicate: Arc<Predicate>) -> Result<()> {
pub fn freeze_with_predicate(&mut self, delete_predicate: Arc<DeletePredicate>) -> Result<()> {
self.freeze_with_delete_predicates(vec![delete_predicate])
}
fn freeze_with_delete_predicates(
&mut self,
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
) -> Result<()> {
match &self.stage {
ChunkStage::Open { mb_chunk, .. } => {
@ -906,7 +906,7 @@ impl CatalogChunk {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::logical_plan::{col, lit};
use data_types::timestamp::TimestampRange;
use entry::test_helpers::lp_to_entry;
use mutable_buffer::chunk::ChunkMetrics as MBChunkMetrics;
@ -916,7 +916,7 @@ mod tests {
make_chunk as make_parquet_chunk_with_store, make_iox_object_store, TestSize,
},
};
use predicate::predicate::PredicateBuilder;
use predicate::delete_expr::DeleteExpr;
#[test]
fn test_new_open() {
@ -1083,18 +1083,20 @@ mod tests {
assert_eq!(del_preds.len(), 0);
// Build delete predicate and expected output
let expr1 = col("city").eq(lit("Boston"));
let del_pred1 = PredicateBuilder::new()
.table("test")
.timestamp_range(1, 100)
.add_expr(expr1)
.build();
let mut expected_exprs1 = vec![];
let e = col("city").eq(lit("Boston"));
expected_exprs1.push(e);
let del_pred1 = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 100 },
exprs: vec![DeleteExpr::new(
"city".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String("Boston".to_string()),
)],
});
// Add a delete predicate into a chunk the open chunk = delete simulation for open chunk
chunk.add_delete_predicate(Arc::new(del_pred1));
chunk.add_delete_predicate(Arc::clone(&del_pred1));
// chunk must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have a delete predicate
@ -1102,26 +1104,22 @@ mod tests {
assert_eq!(del_preds.len(), 1);
// verify delete predicate value
let pred = &del_preds[0];
if let Some(range) = pred.range {
assert_eq!(range.start, 1); // start time
assert_eq!(range.end, 100); // stop time
} else {
panic!("No time range set for delete predicate");
}
assert_eq!(pred.exprs, expected_exprs1);
assert_eq!(pred, &del_pred1);
// let add more delete predicate = simulate second delete
// Build delete predicate and expected output
let expr2 = col("cost").not_eq(lit(15));
let del_pred2 = PredicateBuilder::new()
.table("test")
.timestamp_range(20, 50)
.add_expr(expr2)
.build();
let mut expected_exprs2 = vec![];
let e = col("cost").not_eq(lit(15));
expected_exprs2.push(e);
chunk.add_delete_predicate(Arc::new(del_pred2));
let del_pred2 = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 20, end: 50 },
exprs: vec![DeleteExpr::new(
"cost".to_string(),
predicate::delete_expr::Op::Ne,
predicate::delete_expr::Scalar::I64(15),
)],
});
chunk.add_delete_predicate(Arc::clone(&del_pred2));
// chunk still must be in frozen stage now
assert_eq!(chunk.stage().name(), "Frozen");
// chunk must have 2 delete predicates
@ -1129,13 +1127,7 @@ mod tests {
assert_eq!(del_preds.len(), 2);
// verify the second delete predicate value
let pred = &del_preds[1];
if let Some(range) = pred.range {
assert_eq!(range.start, 20); // start time
assert_eq!(range.end, 50); // stop time
} else {
panic!("No time range set for delete predicate");
}
assert_eq!(pred.exprs, expected_exprs2);
assert_eq!(pred, &del_pred2);
}
fn make_mb_chunk(table_name: &str) -> MBChunk {

View File

@ -13,7 +13,7 @@ use observability_deps::tracing::info;
use persistence_windows::{
min_max_sequence::OptionalMinMaxSequence, persistence_windows::PersistenceWindows,
};
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use snafu::{OptionExt, Snafu};
use std::{collections::BTreeMap, fmt::Display, sync::Arc};
use tracker::RwLock;
@ -231,7 +231,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>,
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
chunk_order: ChunkOrder,
) -> (ChunkId, &Arc<RwLock<CatalogChunk>>) {
let chunk_id = self.next_chunk_id();
@ -273,7 +273,7 @@ impl Partition {
chunk: Arc<parquet_file::chunk::ParquetChunk>,
time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>,
delete_predicates: Vec<Arc<Predicate>>,
delete_predicates: Vec<Arc<DeletePredicate>>,
chunk_order: ChunkOrder,
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name());

View File

@ -18,7 +18,10 @@ use mutable_buffer::chunk::snapshot::ChunkSnapshot;
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use partition_metadata::TableSummary;
use predicate::predicate::{Predicate, PredicateMatch};
use predicate::{
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateMatch},
};
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk;
use snafu::{OptionExt, ResultExt, Snafu};
@ -546,7 +549,7 @@ impl QueryChunkMeta for DbChunk {
}
// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<Predicate>] {
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = &self.meta.delete_predicates;
debug!(?pred, "Delete predicate in DbChunk");

View File

@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use data_types::{chunk_metadata::ChunkOrder, job::Job};
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -47,7 +47,7 @@ pub(crate) fn compact_chunks(
let mut input_rows = 0;
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut delete_predicates: Vec<Arc<DeletePredicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
let query_chunks = chunks
.into_iter()
@ -157,8 +157,8 @@ mod tests {
use crate::db::test_helpers::write_lp;
use crate::{db::test_helpers::write_lp_with_time, utils::make_db};
use data_types::chunk_metadata::ChunkStorage;
use data_types::timestamp::TimestampRange;
use lifecycle::{LockableChunk, LockablePartition};
use predicate::predicate::PredicateBuilder;
use query::QueryDatabase;
#[tokio::test]
@ -235,13 +235,19 @@ mod tests {
assert_eq!(partition_keys.len(), 1);
// Cannot simply use empty predicate (#2687)
let predicate = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 1_000)
.build();
let predicate = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange {
start: 0,
end: 1_000,
},
exprs: vec![],
});
// Delete everything
db.delete("cpu", Arc::new(predicate)).await.unwrap();
db.delete("cpu", predicate).await.unwrap();
let chunk = db
.compact_partition("cpu", partition_keys[0].as_str())
.await

View File

@ -11,7 +11,7 @@ use data_types::{chunk_metadata::ChunkOrder, job::Job};
use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition};
use observability_deps::tracing::info;
use persistence_windows::persistence_windows::FlushHandle;
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use std::{future::Future, sync::Arc};
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
@ -56,7 +56,7 @@ where
let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![];
let mut delete_predicates: Vec<Arc<Predicate>> = vec![];
let mut delete_predicates: Vec<Arc<DeletePredicate>> = vec![];
let mut min_order = ChunkOrder::MAX;
for mut chunk in chunks {
// Sanity-check
@ -213,10 +213,9 @@ mod tests {
use super::*;
use crate::{db::test_helpers::write_lp, utils::TestDb, Db};
use chrono::{TimeZone, Utc};
use data_types::chunk_metadata::ChunkStorage;
use data_types::database_rules::LifecycleRules;
use data_types::{chunk_metadata::ChunkStorage, timestamp::TimestampRange};
use lifecycle::{LockableChunk, LockablePartition};
use predicate::predicate::PredicateBuilder;
use query::QueryDatabase;
use std::{
num::{NonZeroU32, NonZeroU64},
@ -303,12 +302,15 @@ mod tests {
let partition = db.partition("cpu", partition_key.as_str()).unwrap();
// Delete first row
let predicate = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 20)
.build();
let predicate = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange { start: 0, end: 20 },
exprs: vec![],
});
db.delete("cpu", Arc::new(predicate)).await.unwrap();
db.delete("cpu", predicate).await.unwrap();
// Try to persist first write but it has been deleted
let maybe_chunk = db
@ -366,12 +368,18 @@ mod tests {
assert_eq!(chunks[1].row_count, 2);
// Delete everything
let predicate = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 1000)
.build();
let predicate = Arc::new(DeletePredicate {
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
field_columns: None,
partition_key: None,
range: TimestampRange {
start: 0,
end: 1_000,
},
exprs: vec![],
});
db.delete("cpu", Arc::new(predicate)).await.unwrap();
db.delete("cpu", predicate).await.unwrap();
// Try to persist third set of writes
let maybe_chunk = db

View File

@ -26,6 +26,7 @@ use persistence_windows::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder},
persistence_windows::FlushHandle,
};
use predicate::predicate::Predicate;
use query::{QueryChunk, QueryChunkMeta};
use snafu::ResultExt;
use std::{future::Future, sync::Arc};
@ -92,12 +93,13 @@ where
collect_checkpoints(flush_handle.checkpoint(), &db.catalog);
// Get RecordBatchStream of data from the read buffer chunk
let del_preds: Vec<Arc<Predicate>> = db_chunk
.delete_predicates()
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
let stream = db_chunk
.read_filter(
&Default::default(),
Selection::All,
db_chunk.delete_predicates(),
)
.read_filter(&Default::default(), Selection::All, &del_preds)
.expect("read filter should be infallible");
// check that the upcoming state change will very likely succeed

View File

@ -15,7 +15,7 @@ use parquet_file::{
chunk::{ChunkMetrics as ParquetChunkMetrics, ParquetChunk},
};
use persistence_windows::checkpoint::{ReplayPlan, ReplayPlanner};
use predicate::predicate::Predicate;
use predicate::delete_predicate::DeletePredicate;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
@ -228,7 +228,7 @@ impl CatalogState for Loader {
// Delete predicates are loaded explicitely via `CatalogState::delete_predicates` AFTER the chunk is added, so
// we leave this list empty (for now).
let delete_predicates: Vec<Arc<Predicate>> = vec![];
let delete_predicates: Vec<Arc<DeletePredicate>> = vec![];
partition.insert_object_store_only_chunk(
iox_md.chunk_id,
@ -276,7 +276,7 @@ impl CatalogState for Loader {
fn delete_predicate(
&mut self,
predicate: Arc<Predicate>,
predicate: Arc<DeletePredicate>,
chunks: Vec<ChunkAddrWithoutDatabase>,
) {
for addr in chunks {

View File

@ -26,7 +26,7 @@ use data_types::{
};
use influxdb_iox_client::format::QueryOutputFormat;
use influxdb_line_protocol::parse_lines;
use predicate::predicate::{parse_delete, ParseDeletePredicate};
use predicate::delete_predicate::{parse_delete, ParseDeletePredicate};
use query::exec::ExecutionContextProvider;
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
@ -163,13 +163,13 @@ pub enum ApplicationError {
#[snafu(display("Error parsing delete {}: {}", input, source))]
ParsingDelete {
source: predicate::predicate::Error,
source: predicate::delete_predicate::Error,
input: String,
},
#[snafu(display("Error building delete predicate {}: {}", input, source))]
BuildingDeletePredicate {
source: predicate::predicate::Error,
source: predicate::delete_predicate::Error,
input: String,
},

View File

@ -7,7 +7,7 @@ use data_types::chunk_metadata::ChunkId;
use data_types::{server_id::ServerId, DatabaseName};
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use predicate::predicate::ParseDeletePredicate;
use predicate::delete_predicate::ParseDeletePredicate;
use query::QueryDatabase;
use server::rules::ProvidedDatabaseRules;
use server::{ApplicationState, ConnectionManager, Error, Server};