Merge pull request #2723 from influxdata/crepererum/in_mem_expr_part4
refactor: remove unused fields from `DeletePredicate` and remove intermediate `ParseDeletePredicate`pull/24376/head
commit
0a83379e23
|
@ -3,15 +3,14 @@ package influxdata.iox.catalog.v1;
|
|||
|
||||
// Represents a parsed predicate for evaluation by the InfluxDB IOx query engine.
|
||||
message Predicate {
|
||||
// Optional table restriction. If present, restricts the results to only tables these tables.
|
||||
OptionalStringSet table_names = 1;
|
||||
// Was `table_names`.
|
||||
reserved 1;
|
||||
|
||||
// Optional field restriction. If present, restricts the results to only tables which have *at least one* of the
|
||||
// fields in field_columns.
|
||||
OptionalStringSet field_columns = 2;
|
||||
// Was `field_columns`.
|
||||
reserved 2;
|
||||
|
||||
// Optional partition key filter
|
||||
OptionalString partition_key = 3;
|
||||
// Was `partition_key`.
|
||||
reserved 3;
|
||||
|
||||
// Optional timestamp range: only rows within this range are included in results. Other rows are excluded.
|
||||
TimestampRange range = 4;
|
||||
|
|
|
@ -1714,12 +1714,12 @@ mod tests {
|
|||
}
|
||||
|
||||
// create two predicate
|
||||
let predicate_1 = create_delete_predicate(&chunk_addrs[0].table_name, 42);
|
||||
let predicate_1 = create_delete_predicate(42);
|
||||
let chunks_1 = vec![chunk_addrs[0].clone().into()];
|
||||
t.delete_predicate(&predicate_1, &chunks_1).unwrap();
|
||||
state.delete_predicate(predicate_1, chunks_1);
|
||||
|
||||
let predicate_2 = create_delete_predicate(&chunk_addrs[0].table_name, 1337);
|
||||
let predicate_2 = create_delete_predicate(1337);
|
||||
let chunks_2 = vec![chunk_addrs[0].clone().into(), chunk_addrs[1].clone().into()];
|
||||
t.delete_predicate(&predicate_2, &chunks_2).unwrap();
|
||||
state.delete_predicate(predicate_2, chunks_2);
|
||||
|
|
|
@ -525,13 +525,13 @@ where
|
|||
expected_files.insert(chunk_addr_2.chunk_id, (path, Arc::new(metadata)));
|
||||
|
||||
// first predicate used only a single chunk
|
||||
let predicate_1 = create_delete_predicate(&chunk_addr_1.table_name, 1);
|
||||
let predicate_1 = create_delete_predicate(1);
|
||||
let chunks_1 = vec![chunk_addr_1.clone().into()];
|
||||
state.delete_predicate(Arc::clone(&predicate_1), chunks_1.clone());
|
||||
expected_predicates.push((predicate_1, chunks_1));
|
||||
|
||||
// second predicate uses both chunks (but not the older chunks)
|
||||
let predicate_2 = create_delete_predicate(&chunk_addr_2.table_name, 2);
|
||||
let predicate_2 = create_delete_predicate(2);
|
||||
let chunks_2 = vec![chunk_addr_1.into(), chunk_addr_2.into()];
|
||||
state.delete_predicate(Arc::clone(&predicate_2), chunks_2.clone());
|
||||
expected_predicates.push((predicate_2, chunks_2));
|
||||
|
@ -579,7 +579,7 @@ where
|
|||
// Registering predicates for unknown chunks is just ignored because chunks might been in "persisting" intermediate
|
||||
// state while the predicate was reported.
|
||||
{
|
||||
let predicate = create_delete_predicate("some_table", 1);
|
||||
let predicate = create_delete_predicate(1);
|
||||
let chunks = vec![ChunkAddrWithoutDatabase {
|
||||
table_name: Arc::from("some_table"),
|
||||
partition_key: Arc::from("part"),
|
||||
|
@ -638,14 +638,8 @@ fn get_sorted_keys<'a>(
|
|||
}
|
||||
|
||||
/// Helper to create a simple delete predicate.
|
||||
pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc<DeletePredicate> {
|
||||
pub fn create_delete_predicate(value: i64) -> Arc<DeletePredicate> {
|
||||
Arc::new(DeletePredicate {
|
||||
table_names: Some(
|
||||
IntoIterator::into_iter([table_name.to_string(), format!("not_{}", table_name)])
|
||||
.collect(),
|
||||
),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 11, end: 22 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"foo".to_string(),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use chrono::DateTime;
|
||||
use data_types::timestamp::TimestampRange;
|
||||
|
@ -70,17 +70,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// query engine.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct DeletePredicate {
|
||||
/// Optional table restriction. If present, restricts the results
|
||||
/// to only tables whose names are in `table_names`
|
||||
pub table_names: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional field restriction. If present, restricts the results to only
|
||||
/// tables which have *at least one* of the fields in field_columns.
|
||||
pub field_columns: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional partition key filter
|
||||
pub partition_key: Option<String>,
|
||||
|
||||
/// Only rows within this range are included in
|
||||
/// results. Other rows are excluded.
|
||||
pub range: TimestampRange,
|
||||
|
@ -93,254 +82,209 @@ pub struct DeletePredicate {
|
|||
pub exprs: Vec<DeleteExpr>,
|
||||
}
|
||||
|
||||
impl DeletePredicate {
|
||||
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
|
||||
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = parse_time_range(start, stop)?;
|
||||
|
||||
// Parse the predicate
|
||||
let delete_exprs = parse_predicate(predicate)?;
|
||||
|
||||
Ok(Self {
|
||||
range: TimestampRange {
|
||||
start: start_time,
|
||||
end: stop_time,
|
||||
},
|
||||
exprs: delete_exprs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DeletePredicate> for crate::predicate::Predicate {
|
||||
fn from(pred: DeletePredicate) -> Self {
|
||||
Self {
|
||||
table_names: pred.table_names,
|
||||
field_columns: pred.field_columns,
|
||||
partition_key: pred.partition_key,
|
||||
table_names: None,
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: Some(pred.range),
|
||||
exprs: pred.exprs.into_iter().map(|expr| expr.into()).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parser for Delete predicate and time range
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ParseDeletePredicate {
|
||||
pub start_time: i64,
|
||||
pub stop_time: i64,
|
||||
// conjunctive predicate of binary expressions of = or !=
|
||||
pub predicate: Vec<DeleteExpr>,
|
||||
}
|
||||
|
||||
impl ParseDeletePredicate {
|
||||
/// Create a ParseDeletePredicate
|
||||
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<DeleteExpr>) -> Self {
|
||||
Self {
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate,
|
||||
}
|
||||
/// Parse the predicate and convert it into datafusion expression
|
||||
/// A delete predicate is a conjunctive expression of many
|
||||
/// binary expressions of 'colum = constant' or 'column != constant'
|
||||
///
|
||||
fn parse_predicate(predicate: &str) -> Result<Vec<DeleteExpr>> {
|
||||
if predicate.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
|
||||
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
|
||||
// "DELETE FROM table_name WHERE predicate"
|
||||
// Table name can be anything to have sqlparser work on the right sql syntax
|
||||
let mut sql = "DELETE FROM table_name WHERE ".to_string();
|
||||
sql.push_str(predicate);
|
||||
|
||||
// Parse the predicate
|
||||
let delete_exprs = Self::parse_predicate(predicate)?;
|
||||
|
||||
Ok(Self::new(start_time, stop_time, delete_exprs))
|
||||
}
|
||||
|
||||
/// Parse the predicate and convert it into datafusion expression
|
||||
/// A delete predicate is a conjunctive expression of many
|
||||
/// binary expressions of 'colum = constant' or 'column != constant'
|
||||
///
|
||||
pub fn parse_predicate(predicate: &str) -> Result<Vec<DeleteExpr>> {
|
||||
if predicate.is_empty() {
|
||||
return Ok(vec![]);
|
||||
// parse the delete sql
|
||||
let dialect = GenericDialect {};
|
||||
let ast = Parser::parse_sql(&dialect, sql.as_str());
|
||||
match ast {
|
||||
Err(parse_err) => {
|
||||
let error_str = format!("{}, {}", predicate, parse_err);
|
||||
Err(Error::InvalidSyntax { value: error_str })
|
||||
}
|
||||
|
||||
// "DELETE FROM table_name WHERE predicate"
|
||||
// Table name can be anything to have sqlparser work on the right sql syntax
|
||||
let mut sql = "DELETE FROM table_name WHERE ".to_string();
|
||||
sql.push_str(predicate);
|
||||
|
||||
// parse the delete sql
|
||||
let dialect = GenericDialect {};
|
||||
let ast = Parser::parse_sql(&dialect, sql.as_str());
|
||||
match ast {
|
||||
Err(parse_err) => {
|
||||
let error_str = format!("{}, {}", predicate, parse_err);
|
||||
Err(Error::InvalidSyntax { value: error_str })
|
||||
Ok(mut stmt) => {
|
||||
if stmt.len() != 1 {
|
||||
return Err(Error::InvalidSemantics {
|
||||
value: predicate.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(mut stmt) => {
|
||||
if stmt.len() != 1 {
|
||||
return Err(Error::InvalidSemantics {
|
||||
value: predicate.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let stmt = stmt.pop();
|
||||
match stmt {
|
||||
Some(Statement::Delete {
|
||||
table_name: _,
|
||||
selection: Some(expr),
|
||||
..
|
||||
}) => {
|
||||
// split this expr into smaller binary if any
|
||||
let mut exprs = vec![];
|
||||
let split = Self::split_members(&expr, &mut exprs);
|
||||
if !split {
|
||||
return Err(Error::NotSupportPredicate {
|
||||
value: predicate.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(exprs)
|
||||
let stmt = stmt.pop();
|
||||
match stmt {
|
||||
Some(Statement::Delete {
|
||||
table_name: _,
|
||||
selection: Some(expr),
|
||||
..
|
||||
}) => {
|
||||
// split this expr into smaller binary if any
|
||||
let mut exprs = vec![];
|
||||
let split = split_members(&expr, &mut exprs);
|
||||
if !split {
|
||||
return Err(Error::NotSupportPredicate {
|
||||
value: predicate.to_string(),
|
||||
});
|
||||
}
|
||||
_ => Err(Error::InvalidSemantics {
|
||||
value: predicate.to_string(),
|
||||
}),
|
||||
Ok(exprs)
|
||||
}
|
||||
_ => Err(Error::InvalidSemantics {
|
||||
value: predicate.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_delete_predicate(
|
||||
start_time: String,
|
||||
stop_time: String,
|
||||
predicate: String,
|
||||
) -> Result<DeletePredicate, Error> {
|
||||
// parse time range and the predicate
|
||||
let parse_delete_pred =
|
||||
Self::try_new(start_time.as_str(), stop_time.as_str(), predicate.as_str())?;
|
||||
|
||||
Ok(parse_delete_pred.into())
|
||||
}
|
||||
|
||||
/// Recursively split all "AND" expressions into smaller ones
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
/// Return false if not all of them are AND of binary expression of
|
||||
/// "column_name = literal" or "column_name != literal"
|
||||
///
|
||||
/// The split expressions will be converted into data fusion expressions
|
||||
pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec<DeleteExpr>) -> bool {
|
||||
// The below code built to be compatible with
|
||||
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
|
||||
match predicate {
|
||||
SqlParserExpr::BinaryOp {
|
||||
left,
|
||||
op: BinaryOperator::And,
|
||||
right,
|
||||
} => {
|
||||
if !Self::split_members(left, predicates) {
|
||||
return false;
|
||||
}
|
||||
if !Self::split_members(right, predicates) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SqlParserExpr::BinaryOp { left, op, right } => {
|
||||
// Verify Operator
|
||||
let op = match op {
|
||||
BinaryOperator::Eq => Operator::Eq,
|
||||
BinaryOperator::NotEq => Operator::NotEq,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
// verify if left is identifier (column name)
|
||||
let column = match &**left {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _, // all quotes are ignored as done in idpe
|
||||
}) => Expr::Column(Column {
|
||||
relation: None,
|
||||
name: value.to_string(),
|
||||
}),
|
||||
_ => return false, // not a column name
|
||||
};
|
||||
|
||||
// verify if right is a literal or an identifier (e.g column name)
|
||||
let value = match &**right {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _,
|
||||
}) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
|
||||
lit(value.to_string())
|
||||
}
|
||||
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
|
||||
Ok(v) => lit(v),
|
||||
Err(_) => lit(v.parse::<f64>().unwrap()),
|
||||
},
|
||||
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
|
||||
_ => return false, // not a literal
|
||||
};
|
||||
|
||||
let expr = Expr::BinaryExpr {
|
||||
left: Box::new(column),
|
||||
op,
|
||||
right: Box::new(value),
|
||||
};
|
||||
let expr: Result<DeleteExpr, _> = expr.try_into();
|
||||
match expr {
|
||||
Ok(expr) => {
|
||||
predicates.push(expr);
|
||||
}
|
||||
Err(_) => {
|
||||
// cannot convert
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => return false,
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Parse a time and return its time in nanosecond
|
||||
pub fn parse_time(input: &str) -> Result<i64> {
|
||||
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
|
||||
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
|
||||
let datetime_result = DateTime::parse_from_rfc3339(input);
|
||||
match datetime_result {
|
||||
Ok(datetime) => Ok(datetime.timestamp_nanos()),
|
||||
Err(timestamp_err) => {
|
||||
// See if it is in nanosecond form
|
||||
let time_result = input.parse::<i64>();
|
||||
match time_result {
|
||||
Ok(nano) => Ok(nano),
|
||||
Err(nano_err) => {
|
||||
// wrong format, return both error
|
||||
let error_str = format!("{}, {}", timestamp_err, nano_err);
|
||||
Err(Error::InvalidTimestamp { value: error_str })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a time range [start, stop]
|
||||
pub fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
|
||||
let start_time = Self::parse_time(start)?;
|
||||
let stop_time = Self::parse_time(stop)?;
|
||||
if start_time > stop_time {
|
||||
return Err(Error::InvalidTimeRange {
|
||||
start: start.to_string(),
|
||||
stop: stop.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok((start_time, stop_time))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ParseDeletePredicate> for DeletePredicate {
|
||||
fn from(pred: ParseDeletePredicate) -> Self {
|
||||
Self {
|
||||
table_names: None,
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: pred.start_time,
|
||||
end: pred.stop_time,
|
||||
},
|
||||
exprs: pred.predicate,
|
||||
/// Recursively split all "AND" expressions into smaller ones
|
||||
/// Example: "A AND B AND C" => [A, B, C]
|
||||
/// Return false if not all of them are AND of binary expression of
|
||||
/// "column_name = literal" or "column_name != literal"
|
||||
///
|
||||
/// The split expressions will be converted into data fusion expressions
|
||||
fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec<DeleteExpr>) -> bool {
|
||||
// The below code built to be compatible with
|
||||
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
|
||||
match predicate {
|
||||
SqlParserExpr::BinaryOp {
|
||||
left,
|
||||
op: BinaryOperator::And,
|
||||
right,
|
||||
} => {
|
||||
if !split_members(left, predicates) {
|
||||
return false;
|
||||
}
|
||||
if !split_members(right, predicates) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
SqlParserExpr::BinaryOp { left, op, right } => {
|
||||
// Verify Operator
|
||||
let op = match op {
|
||||
BinaryOperator::Eq => Operator::Eq,
|
||||
BinaryOperator::NotEq => Operator::NotEq,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
// verify if left is identifier (column name)
|
||||
let column = match &**left {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _, // all quotes are ignored as done in idpe
|
||||
}) => Expr::Column(Column {
|
||||
relation: None,
|
||||
name: value.to_string(),
|
||||
}),
|
||||
_ => return false, // not a column name
|
||||
};
|
||||
|
||||
// verify if right is a literal or an identifier (e.g column name)
|
||||
let value = match &**right {
|
||||
SqlParserExpr::Identifier(Ident {
|
||||
value,
|
||||
quote_style: _,
|
||||
}) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::SingleQuotedString(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::HexStringLiteral(value)) => lit(value.to_string()),
|
||||
SqlParserExpr::Value(Value::Number(v, _)) => match v.parse::<i64>() {
|
||||
Ok(v) => lit(v),
|
||||
Err(_) => lit(v.parse::<f64>().unwrap()),
|
||||
},
|
||||
SqlParserExpr::Value(Value::Boolean(v)) => lit(*v),
|
||||
_ => return false, // not a literal
|
||||
};
|
||||
|
||||
let expr = Expr::BinaryExpr {
|
||||
left: Box::new(column),
|
||||
op,
|
||||
right: Box::new(value),
|
||||
};
|
||||
let expr: Result<DeleteExpr, _> = expr.try_into();
|
||||
match expr {
|
||||
Ok(expr) => {
|
||||
predicates.push(expr);
|
||||
}
|
||||
Err(_) => {
|
||||
// cannot convert
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => return false,
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Parse a time and return its time in nanosecond
|
||||
fn parse_time(input: &str) -> Result<i64> {
|
||||
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
|
||||
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
|
||||
let datetime_result = DateTime::parse_from_rfc3339(input);
|
||||
match datetime_result {
|
||||
Ok(datetime) => Ok(datetime.timestamp_nanos()),
|
||||
Err(timestamp_err) => {
|
||||
// See if it is in nanosecond form
|
||||
let time_result = input.parse::<i64>();
|
||||
match time_result {
|
||||
Ok(nano) => Ok(nano),
|
||||
Err(nano_err) => {
|
||||
// wrong format, return both error
|
||||
let error_str = format!("{}, {}", timestamp_err, nano_err);
|
||||
Err(Error::InvalidTimestamp { value: error_str })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a time range [start, stop]
|
||||
fn parse_time_range(start: &str, stop: &str) -> Result<(i64, i64)> {
|
||||
let start_time = parse_time(start)?;
|
||||
let stop_time = parse_time(stop)?;
|
||||
if start_time > stop_time {
|
||||
return Err(Error::InvalidTimeRange {
|
||||
start: start.to_string(),
|
||||
stop: stop.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok((start_time, stop_time))
|
||||
}
|
||||
|
||||
// Note that this struct and its functions are used to parse FLUX DELETE,
|
||||
// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before
|
||||
// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is
|
||||
|
@ -453,19 +397,19 @@ mod tests {
|
|||
fn test_time_range_valid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let result = parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let result = parse_time_range(start, stop).unwrap();
|
||||
let expected = (100, 200);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let result = parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 0);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
|
@ -477,13 +421,13 @@ mod tests {
|
|||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let result = parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 100);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"1970-01-01T00:01:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
|
||||
let result = parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 60000000000);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
@ -492,72 +436,72 @@ mod tests {
|
|||
fn test_time_range_invalid() {
|
||||
let start = r#"100"#;
|
||||
let stop = r#"-100"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
let result = parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
let result = parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
let result = parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"1971-09-01T00:00:10Z"#;
|
||||
let stop = r#"1971-09-01T00:00:05Z"#;
|
||||
let result = ParseDeletePredicate::parse_time_range(start, stop);
|
||||
let result = parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp() {
|
||||
let input = r#"123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
let time = parse_time(input).unwrap();
|
||||
assert_eq!(time, 123);
|
||||
|
||||
// must parse time
|
||||
let input = r#"1970-01-01T00:00:00Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
let time = parse_time(input).unwrap();
|
||||
assert_eq!(time, 0);
|
||||
|
||||
let input = r#"1971-02-01T15:30:21Z"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
let time = parse_time(input).unwrap();
|
||||
assert_eq!(time, 34270221000000000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_negative() {
|
||||
let input = r#"-123"#;
|
||||
let time = ParseDeletePredicate::parse_time(input).unwrap();
|
||||
let time = parse_time(input).unwrap();
|
||||
assert_eq!(time, -123);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_invalid() {
|
||||
let input = r#"123gdb"#;
|
||||
ParseDeletePredicate::parse_time(input).unwrap_err();
|
||||
parse_time(input).unwrap_err();
|
||||
|
||||
let input = r#"1970-01-01T00:00:00"#;
|
||||
ParseDeletePredicate::parse_time(input).unwrap_err();
|
||||
parse_time(input).unwrap_err();
|
||||
|
||||
// It turn out this is not invalid but return1 1971
|
||||
let input = r#"1971-02-01:30:21Z"#;
|
||||
ParseDeletePredicate::parse_time(input).unwrap_err();
|
||||
parse_time(input).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_timestamp_out_of_range() {
|
||||
let input = r#"99999999999999999999999999999999"#;
|
||||
let time = ParseDeletePredicate::parse_time(input);
|
||||
let time = parse_time(input);
|
||||
assert!(time.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_predicate() {
|
||||
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
|
||||
let result = ParseDeletePredicate::parse_predicate(pred).unwrap();
|
||||
let result = parse_predicate(pred).unwrap();
|
||||
|
||||
println!("{:#?}", result);
|
||||
|
||||
|
@ -582,27 +526,27 @@ mod tests {
|
|||
#[test]
|
||||
fn test_parse_predicate_invalid() {
|
||||
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost > 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost <= 100"#; // <
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost gt 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city = cost = 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
let result = parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
|
@ -612,16 +556,16 @@ mod tests {
|
|||
let stop = r#"200"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap();
|
||||
assert_eq!(result.start_time, 0);
|
||||
assert_eq!(result.stop_time, 200);
|
||||
let result = DeletePredicate::try_new(start, stop, pred).unwrap();
|
||||
assert_eq!(result.range.start, 0);
|
||||
assert_eq!(result.range.end, 200);
|
||||
|
||||
let expected = vec![DeleteExpr::new(
|
||||
"cost".to_string(),
|
||||
Op::Ne,
|
||||
Scalar::I64(100),
|
||||
)];
|
||||
assert_eq!(result.predicate, expected);
|
||||
assert_eq!(result.exprs, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -630,7 +574,7 @@ mod tests {
|
|||
let stop = r#"50"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred);
|
||||
let result = DeletePredicate::try_new(start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
|
@ -640,7 +584,7 @@ mod tests {
|
|||
let stop = r#"200"#;
|
||||
let pred = r#"cost > 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred);
|
||||
let result = DeletePredicate::try_new(start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
//!
|
||||
//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto
|
||||
//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
|
||||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use data_types::timestamp::TimestampRange;
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
@ -17,9 +17,6 @@ use crate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
|
|||
/// Serialize IOx [`DeletePredicate`] to a protobuf object.
|
||||
pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate {
|
||||
proto::Predicate {
|
||||
table_names: serialize_optional_string_set(&predicate.table_names),
|
||||
field_columns: serialize_optional_string_set(&predicate.field_columns),
|
||||
partition_key: serialize_optional_string(&predicate.partition_key),
|
||||
range: Some(proto::TimestampRange {
|
||||
start: predicate.range.start,
|
||||
end: predicate.range.end,
|
||||
|
@ -32,19 +29,6 @@ pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate {
|
|||
}
|
||||
}
|
||||
|
||||
fn serialize_optional_string_set(
|
||||
set: &Option<BTreeSet<String>>,
|
||||
) -> Option<proto::OptionalStringSet> {
|
||||
set.as_ref().map(|set| proto::OptionalStringSet {
|
||||
values: set.iter().cloned().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_optional_string(s: &Option<String>) -> Option<proto::OptionalString> {
|
||||
s.as_ref()
|
||||
.map(|s| proto::OptionalString { value: s.clone() })
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DeserializeError {
|
||||
#[snafu(display("timestamp range is missing"))]
|
||||
|
@ -61,9 +45,6 @@ pub fn deserialize(
|
|||
proto_predicate: &proto::Predicate,
|
||||
) -> Result<DeletePredicate, DeserializeError> {
|
||||
let predicate = DeletePredicate {
|
||||
table_names: deserialize_optional_string_set(&proto_predicate.table_names),
|
||||
field_columns: deserialize_optional_string_set(&proto_predicate.field_columns),
|
||||
partition_key: deserialize_optional_string(&proto_predicate.partition_key),
|
||||
range: proto_predicate
|
||||
.range
|
||||
.as_ref()
|
||||
|
@ -84,41 +65,23 @@ pub fn deserialize(
|
|||
Ok(predicate)
|
||||
}
|
||||
|
||||
fn deserialize_optional_string_set(
|
||||
set: &Option<proto::OptionalStringSet>,
|
||||
) -> Option<BTreeSet<String>> {
|
||||
set.as_ref().map(|set| set.values.iter().cloned().collect())
|
||||
}
|
||||
|
||||
fn deserialize_optional_string(s: &Option<proto::OptionalString>) -> Option<String> {
|
||||
s.as_ref().map(|s| s.value.clone())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::delete_predicate::ParseDeletePredicate;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_roundtrip() {
|
||||
let table_name = "my_table";
|
||||
let predicate = delete_predicate(table_name);
|
||||
let predicate = delete_predicate();
|
||||
let proto = serialize(&predicate);
|
||||
let recovered = deserialize(&proto).unwrap();
|
||||
assert_eq!(predicate, recovered);
|
||||
}
|
||||
|
||||
fn delete_predicate(table_name: &str) -> DeletePredicate {
|
||||
fn delete_predicate() -> DeletePredicate {
|
||||
let start_time = "11";
|
||||
let stop_time = "22";
|
||||
let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#;
|
||||
|
||||
let parse_delete_pred =
|
||||
ParseDeletePredicate::try_new(start_time, stop_time, predicate).unwrap();
|
||||
|
||||
let mut pred: DeletePredicate = parse_delete_pred.into();
|
||||
pred.table_names = Some(IntoIterator::into_iter([table_name.to_string()]).collect());
|
||||
pred
|
||||
DeletePredicate::try_new(start_time, stop_time, predicate).unwrap()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,9 +34,6 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
|
|||
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 10, end: 20 },
|
||||
exprs: vec![],
|
||||
};
|
||||
|
@ -61,9 +58,6 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
|
|||
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 15 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
@ -95,9 +89,6 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
|
|||
];
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 30 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -142,9 +133,6 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
|
|||
// delete predicate
|
||||
// pred1: delete from cpu where 0 <= time <= 32 and bar = 1 and foo = 'me'
|
||||
let pred1 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 32 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -162,9 +150,6 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
|
|||
|
||||
// pred2: delete from cpu where 10 <= time <= 40 and bar != 1
|
||||
let pred2 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 10, end: 40 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
@ -205,9 +190,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 1
|
||||
let pred1 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 30 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -232,9 +214,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 1 & chunk 2
|
||||
let pred2 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 20, end: 45 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"foo".to_string(),
|
||||
|
@ -252,9 +231,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 3
|
||||
let pred3 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 75, end: 95 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
|
|
@ -3682,9 +3682,6 @@ mod tests {
|
|||
|
||||
// ==================== do: delete ====================
|
||||
let pred = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
|
@ -1084,9 +1084,6 @@ mod tests {
|
|||
|
||||
// Build delete predicate and expected output
|
||||
let del_pred1 = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 100 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"city".to_string(),
|
||||
|
@ -1109,9 +1106,6 @@ mod tests {
|
|||
// let add more delete predicate = simulate second delete
|
||||
// Build delete predicate and expected output
|
||||
let del_pred2 = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 20, end: 50 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"cost".to_string(),
|
||||
|
|
|
@ -236,9 +236,6 @@ mod tests {
|
|||
|
||||
// Cannot simply use empty predicate (#2687)
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
|
@ -303,9 +303,6 @@ mod tests {
|
|||
|
||||
// Delete first row
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 20 },
|
||||
exprs: vec![],
|
||||
});
|
||||
|
@ -369,9 +366,6 @@ mod tests {
|
|||
|
||||
// Delete everything
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
|
@ -26,7 +26,7 @@ use data_types::{
|
|||
};
|
||||
use influxdb_iox_client::format::QueryOutputFormat;
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use predicate::delete_predicate::{parse_delete, ParseDeletePredicate};
|
||||
use predicate::delete_predicate::{parse_delete, DeletePredicate};
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
|
||||
|
||||
|
@ -603,7 +603,7 @@ where
|
|||
let db = server.db(&db_name)?;
|
||||
|
||||
// Build delete predicate
|
||||
let del_predicate = ParseDeletePredicate::build_delete_predicate(start, stop, predicate)
|
||||
let del_predicate = DeletePredicate::try_new(&start, &stop, &predicate)
|
||||
.context(BuildingDeletePredicate { input: body })?;
|
||||
|
||||
// Tables data will be deleted from
|
||||
|
|
|
@ -7,7 +7,7 @@ use data_types::chunk_metadata::ChunkId;
|
|||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
|
||||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use predicate::delete_predicate::ParseDeletePredicate;
|
||||
use predicate::delete_predicate::DeletePredicate;
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server};
|
||||
|
@ -623,11 +623,7 @@ where
|
|||
.db(&db_name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
let del_predicate_result = ParseDeletePredicate::build_delete_predicate(
|
||||
start_time.clone(),
|
||||
stop_time.clone(),
|
||||
predicate.clone(),
|
||||
);
|
||||
let del_predicate_result = DeletePredicate::try_new(&start_time, &stop_time, &predicate);
|
||||
match del_predicate_result {
|
||||
Err(_) => {
|
||||
return Err(default_server_error_handler(Error::DeleteExpression {
|
||||
|
|
Loading…
Reference in New Issue