feat: finally have the delete predicate parsed

pull/24376/head
Nga Tran 2021-09-08 17:30:10 -04:00
parent 29b1041305
commit 00df7b064c
14 changed files with 448 additions and 261 deletions

3
Cargo.lock generated
View File

@ -1828,11 +1828,13 @@ name = "influxdb_line_protocol"
version = "0.1.0"
dependencies = [
"chrono",
"datafusion 0.1.0",
"nom",
"observability_deps",
"serde",
"smallvec",
"snafu",
"sqlparser",
"test_helpers",
"thiserror",
]
@ -3328,6 +3330,7 @@ dependencies = [
"regex",
"snafu",
"test_helpers",
"thiserror",
"tokio",
"tokio-stream",
"trace",

View File

@ -455,8 +455,15 @@ message DeleteRequest {
// table name
string table_name = 2;
// a delete info: time range and predicate of other binary expressions
ParseDelete parse_delete = 3;
// start time range
string start_time = 3;
// stop time range
string stop_time = 4;
// predicate
// conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal'
string predicate = 5;
}
message DeleteResponse {

View File

@ -129,7 +129,6 @@ pub mod database_state;
pub mod deleted_database;
pub mod google;
pub mod job;
pub mod parse_delete;
#[cfg(test)]
mod tests {

View File

@ -1,103 +0,0 @@
//! Conversion code to management Delete API structures and vice versa
use crate::{
google::{FieldViolation, FromFieldOpt},
influxdata::iox::management::v1 as management,
};
use influxdb_line_protocol::delete_parser::{
ProvidedDeleteBinaryExpr, ProvidedDeleteOp, ProvidedParseDelete,
};
use std::convert::{TryFrom, TryInto};
/// ProvidedDeleteOp to management API DeleteOp
impl From<ProvidedDeleteOp> for management::DeleteOp {
fn from(op: ProvidedDeleteOp) -> Self {
match op {
ProvidedDeleteOp::Eq => Self::Eq,
ProvidedDeleteOp::NotEq => Self::NotEq,
}
}
}
/// management API DeleteOp to ProvidedDeleteOp
impl TryFrom<management::DeleteOp> for ProvidedDeleteOp {
type Error = FieldViolation;
fn try_from(proto: management::DeleteOp) -> Result<Self, Self::Error> {
match proto {
management::DeleteOp::Eq => Ok(Self::Eq),
management::DeleteOp::NotEq => Ok(Self::NotEq),
management::DeleteOp::Unspecified => Err(FieldViolation::required("")),
}
}
}
/// ProvidedDeleteBinary to management API DeleteBinaryExpr
impl From<ProvidedDeleteBinaryExpr> for management::DeleteBinaryExpr {
fn from(bin_expr: ProvidedDeleteBinaryExpr) -> Self {
let ProvidedDeleteBinaryExpr { column, op, value } = bin_expr;
Self {
column,
op: management::DeleteOp::from(op).into(),
value,
}
}
}
/// management API DeleteBinaryExpr to ProvidedDeleteBinary
impl TryFrom<management::DeleteBinaryExpr> for ProvidedDeleteBinaryExpr {
type Error = FieldViolation;
fn try_from(proto: management::DeleteBinaryExpr) -> Result<Self, Self::Error> {
let management::DeleteBinaryExpr { column, op, value } = proto;
Ok(Self {
column,
op: management::DeleteOp::from_i32(op).required("op")?,
value,
})
}
}
/// ProvidedParseDelete to management API ParseDelete
impl From<ProvidedParseDelete> for management::ParseDelete {
fn from(parse_delete: ProvidedParseDelete) -> Self {
let ProvidedParseDelete {
start_time,
stop_time,
predicate,
} = parse_delete;
Self {
start_time,
stop_time,
exprs: predicate.into_iter().map(Into::into).collect(),
}
}
}
/// management API ParseDelete to ProvidedParseDelete
impl TryFrom<management::ParseDelete> for ProvidedParseDelete {
type Error = FieldViolation;
fn try_from(proto: management::ParseDelete) -> Result<Self, Self::Error> {
let management::ParseDelete {
start_time,
stop_time,
exprs,
} = proto;
let pred_result: Result<Vec<ProvidedDeleteBinaryExpr>, Self::Error> =
exprs.into_iter().map(TryInto::try_into).collect();
let pred = pred_result?;
Ok(Self {
start_time,
stop_time,
predicate: pred,
})
}
}

View File

@ -4,7 +4,6 @@ use self::generated_types::{management_service_client::ManagementServiceClient,
use crate::connection::Connection;
use ::generated_types::google::longrunning::Operation;
use influxdb_line_protocol::delete_parser::{self, ProvidedParseDelete};
use std::convert::TryInto;
use std::num::NonZeroU32;
@ -361,12 +360,6 @@ pub enum DropPartitionError {
/// Errors returned by [`Client::delete`]
#[derive(Debug, Error)]
pub enum DeleteError {
/// Invalid time range or predicate
/// The error message is sent back from the original one which
/// will include the detail
#[error("Invalid input: {}", .0)]
ParseErr(delete_parser::Error),
/// Database not found
#[error("Not found: {}", .0)]
NotFound(String),
@ -967,44 +960,30 @@ impl Client {
&mut self,
db_name: impl Into<String> + Send,
table_name: impl Into<String> + Send,
predicate: impl Into<String> + Send,
start_time: impl Into<String> + Send,
stop_time: impl Into<String> + Send,
predicate: impl Into<String> + Send,
) -> Result<(), DeleteError> {
let db_name = db_name.into();
let table_name = table_name.into();
let predicate = predicate.into();
let start_time = start_time.into();
let stop_time = stop_time.into();
let predicate = predicate.into();
// parse the time range and predicate
let provided_parse_delete_result = ProvidedParseDelete::try_new(
start_time.as_str(),
stop_time.as_str(),
predicate.as_str(),
);
match provided_parse_delete_result {
Err(e) => return Err(DeleteError::ParseErr(e)),
Ok(provided_parse_delete) => {
// Send the delete request to server
let parse_delete = Some(provided_parse_delete.into());
self.inner
.delete(DeleteRequest {
db_name,
table_name,
parse_delete,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => {
DeleteError::NotFound(status.message().to_string())
}
tonic::Code::Unavailable => DeleteError::Unavailable(status),
_ => DeleteError::ServerError(status),
})?;
}
}
self.inner
.delete(DeleteRequest {
db_name,
table_name,
start_time,
stop_time,
predicate,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => DeleteError::NotFound(status.message().to_string()),
tonic::Code::Unavailable => DeleteError::Unavailable(status),
_ => DeleteError::ServerError(status),
})?;
// NGA todo: return a handle to the delete?
Ok(())

View File

@ -6,11 +6,13 @@ edition = "2018"
[dependencies] # In alphabetical order
chrono = "0.4"
datafusion = { path = "../datafusion" }
nom = "6"
smallvec = "1.2.0"
snafu = "0.6.2"
observability_deps = { path = "../observability_deps" }
serde = { version = "1.0", features = ["derive"] }
sqlparser = "0.10.0"
thiserror = "1.0.28"
[dev-dependencies] # In alphabetical order

View File

@ -9,11 +9,20 @@
clippy::clone_on_ref_ptr
)]
use std::fmt::Display;
use datafusion::{
logical_plan::{Column, Expr, Operator},
scalar::ScalarValue,
};
use sqlparser::{
ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value},
dialect::GenericDialect,
parser::Parser,
};
use thiserror::Error;
use serde::{Deserialize, Serialize};
//use serde::{Deserialize, Serialize};
use chrono::DateTime;
@ -29,60 +38,35 @@ pub enum Error {
/// Invalid time range
#[error("Invalid time range: ({}, {})", .0, .1)]
InvalidTimeRange(String, String),
/// Predicate syntax error
#[error("Invalid predicate syntax: ({})", .0)]
InvalidSyntax(String),
/// Predicate semantics error
#[error("Invalid predicate semantics: ({})", .0)]
InvalidSemantics(String),
/// Predicate include non supported expression
#[error("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", .0)]
NotSupportPredicate(String),
}
/// Result type for Parser Cient
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Parser for Delete predicate and time range
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
pub struct ProvidedParseDelete {
#[derive(Debug, Clone)]
pub struct ParseDeletePredicate {
pub start_time: i64,
pub stop_time: i64,
// conjunctive predicate of binary expressions of = or !=
pub predicate: Vec<ProvidedDeleteBinaryExpr>,
pub predicate: Vec<Expr>,
}
/// Single Binary expression of delete which
/// in the form of "column = value" or column != value"
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
pub struct ProvidedDeleteBinaryExpr {
pub column: String,
pub op: ProvidedDeleteOp,
pub value: String, // NGA Todo: should be enum like FieldValue
}
/// Delete Operator which either "=" or "!="
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)]
pub enum ProvidedDeleteOp {
/// represent "="
Eq,
/// represet "!="
NotEq,
}
impl Display for ProvidedDeleteOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Eq => write!(f, "Eq"),
Self::NotEq => write!(f, "NotEq"),
}
}
}
impl ProvidedDeleteOp {
/// Return a str representation of this DeleteOp
pub fn as_str(&self) -> &'static str {
match self {
Self::Eq => "Eq",
Self::NotEq => "NotEq",
}
}
}
impl ProvidedParseDelete {
/// Create a ProvidedParseDelete
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<ProvidedDeleteBinaryExpr>) -> Self {
impl ParseDeletePredicate {
/// Create a ParseDeletePredicate
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<Expr>) -> Self {
Self {
start_time,
stop_time,
@ -90,22 +74,152 @@ impl ProvidedParseDelete {
}
}
/// Parse and convert the delete grpc API into ProvidedParseDelete to send to server
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result<Self> {
// parse and check time range
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
// Parse the predicate
let delete_exprs = Self::parse_predicate(predicate)?;
let delete_exprs = Self::parse_predicate(table_name, predicate)?;
Ok(Self::new(start_time, stop_time, delete_exprs))
}
/// Parse the predicate
// NGA TODO: parse the delete predicate which is a conjunctive expression of many
// binary expressions of 'colum = constant' or 'column != constant'
pub fn parse_predicate(_predicate: &str) -> Result<Vec<ProvidedDeleteBinaryExpr>> {
Ok(vec![])
/// Parse the predicate and convert it into datafusion expression
/// parse the delete predicate which is a conjunctive expression of many
/// binary expressions of 'colum = constant' or 'column != constant'
///
pub fn parse_predicate(table_name: &str, predicate: &str) -> Result<Vec<Expr>> {
if predicate.is_empty() {
return Ok(vec![]);
}
// Now add this predicate string into a DELETE SQL to user sqlparser to parse it
// "DELETE FROM table_name WHERE predicate"
let mut sql = "DELETE FROM ".to_string();
sql.push_str(table_name);
sql.push_str(" WHERE ");
sql.push_str(predicate);
// parse the delete sql
let dialect = GenericDialect {};
let ast = Parser::parse_sql(&dialect, sql.as_str());
match ast {
Err(parse_err) => {
let error_str = format!("{}, {}", predicate, parse_err);
Err(Error::InvalidSyntax(error_str))
}
Ok(mut stmt) => {
if stmt.len() != 1 {
return Err(Error::InvalidSemantics(predicate.to_string()));
}
let stmt = stmt.pop();
match stmt {
Some(Statement::Delete {
table_name: _,
selection: Some(expr),
..
}) => {
// split this expr into smaller binary if any
let mut exprs = vec![];
let split = Self::split_members(table_name, &expr, &mut exprs);
if !split {
return Err(Error::NotSupportPredicate(predicate.to_string()));
}
Ok(exprs)
}
_ => Err(Error::InvalidSemantics(predicate.to_string())),
}
}
}
}
/// Recursively split all "AND" expressions into smaller ones
/// Example: "A AND B AND C" => [A, B, C]
/// Return false if not all of them are AND of binary expression of
/// "column_name = literal" or "column_name != literal"
///
/// The split expressions will be converted into data fusion expressions
pub fn split_members(
table_name: &str,
predicate: &SqlParserExpr,
predicates: &mut Vec<Expr>,
) -> bool {
// Th below code built to be compatible with
// https://github.com/influxdata/idpe/blob/master/influxdbv2/predicate/parser_test.go
match predicate {
SqlParserExpr::BinaryOp {
left,
op: BinaryOperator::And,
right,
} => {
if !Self::split_members(table_name, left, predicates) {
return false;
}
if !Self::split_members(table_name, right, predicates) {
return false;
}
}
SqlParserExpr::BinaryOp { left, op, right } => {
// Verify Operator
let op = match op {
BinaryOperator::Eq => Operator::Eq,
BinaryOperator::NotEq => Operator::NotEq,
_ => return false,
};
// verify if left is identifier (column name)
let column = match &**left {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _, // all quotes are ignored as done in idpe
}) => Expr::Column(Column {
relation: Some(table_name.to_string()),
name: value.to_string(),
}),
_ => return false, // not a column name
};
// verify if right is a literal
let value = match &**right {
SqlParserExpr::Identifier(Ident {
value,
quote_style: _,
}) => Expr::Literal(ScalarValue::Utf8(Some(value.to_string()))),
SqlParserExpr::Value(Value::DoubleQuotedString(value)) => {
Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))
}
SqlParserExpr::Value(Value::SingleQuotedString(value)) => {
Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))
}
SqlParserExpr::Value(Value::NationalStringLiteral(value)) => {
Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))
}
SqlParserExpr::Value(Value::HexStringLiteral(value)) => {
Expr::Literal(ScalarValue::Utf8(Some(value.to_string())))
}
SqlParserExpr::Value(Value::Number(v, _)) => {
Expr::Literal(ScalarValue::Float64(Some(v.parse().unwrap())))
}
// NGA todo: how to now this is an integer?
SqlParserExpr::Value(Value::Boolean(v)) => {
Expr::Literal(ScalarValue::Boolean(Some(*v)))
}
_ => return false, // not a literal
};
let expr = Expr::BinaryExpr {
left: Box::new(column),
op,
right: Box::new(value),
};
predicates.push(expr);
}
_ => return false,
}
true
}
/// Parse a time and return its time in nanosecond
@ -144,43 +258,46 @@ impl ProvidedParseDelete {
#[cfg(test)]
mod test {
use datafusion::logical_plan::{col, lit};
use sqlparser::test_utils::number;
use super::*;
#[test]
fn test_time_range_valid() {
let start = r#"100"#;
let stop = r#"100"#;
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 100);
assert_eq!(result, expected);
let start = r#"100"#;
let stop = r#"200"#;
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (100, 200);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 0);
assert_eq!(result, expected);
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
// let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
// let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
// let expected = (0, 0);
// assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"100"#;
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 100);
assert_eq!(result, expected);
let start = r#"1970-01-01T00:00:00Z"#;
let stop = r#"1970-01-01T00:01:00Z"#;
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
let result = ParseDeletePredicate::parse_time_range(start, stop).unwrap();
let expected = (0, 60000000000);
assert_eq!(result, expected);
}
@ -189,68 +306,66 @@ mod test {
fn test_time_range_invalid() {
let start = r#"100"#;
let stop = r#"-100"#;
let result = ProvidedParseDelete::parse_time_range(start, stop);
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"50"#; // this is nano 0
let result = ProvidedParseDelete::parse_time_range(start, stop);
let stop = r#"50"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"100"#;
let stop = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let result = ProvidedParseDelete::parse_time_range(start, stop);
let stop = r#"1970-01-01T00:00:00Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
let start = r#"1971-09-01T00:00:10Z"#;
let stop = r#"1971-09-01T00:00:05Z"#; // this is nano 0
let result = ProvidedParseDelete::parse_time_range(start, stop);
let stop = r#"1971-09-01T00:00:05Z"#;
let result = ParseDeletePredicate::parse_time_range(start, stop);
assert!(result.is_err());
}
#[test]
fn test_parse_timestamp() {
let input = r#"123"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 123);
// must parse time
let input = r#"1970-01-01T00:00:00Z"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 0);
let input = r#"1971-02-01T15:30:21Z"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 34270221000000000);
}
#[test]
fn test_parse_timestamp_negative() {
let input = r#"-123"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, -123);
}
// THESE TESTS ARE WEIRD. Need to see if this is acceptable
// I use the standard parsers here
#[test]
fn test_parse_timestamp_invalid() {
// It turn out this is not invalid but return1 123
let input = r#"123gdb"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 123);
//assert!(time.is_err());
// must parse time
// It turn out this is not invalid but return1 1970
let input = r#"1970-01-01T00:00:00"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 1970);
//assert!(time.is_err());
// It turn out this is not invalid but return1 1971
let input = r#"1971-02-01:30:21Z"#;
let time = ProvidedParseDelete::parse_time(input).unwrap();
let time = ParseDeletePredicate::parse_time(input).unwrap();
assert_eq!(time, 1971);
//assert!(time.is_err());
}
@ -258,7 +373,166 @@ mod test {
#[test]
fn test_parse_timestamp_out_of_range() {
let input = r#"99999999999999999999999999999999"#;
let time = ProvidedParseDelete::parse_time(input);
let time = ParseDeletePredicate::parse_time(input);
assert!(time.is_err());
}
#[test]
fn test_sqlparser() {
// test how to use sqlparser
let dialect = sqlparser::dialect::GenericDialect {};
// string from IOx management API
let iox_delete_predicate = r#"city = Boston and cost !=100 and state != "MA""#;
// convert it to Delete SQL
let mut sql = "DELETE FROM table_name WHERE ".to_string();
sql.push_str(iox_delete_predicate);
// parse the delete sql
let mut ast = Parser::parse_sql(&dialect, sql.as_str()).unwrap();
println!("AST: {:#?}", ast);
// verify the parsed content
assert_eq!(ast.len(), 1);
let stmt = ast.pop().unwrap();
match stmt {
Statement::Delete {
table_name: _,
selection,
..
} => {
// Verify selection
let results = selection.unwrap();
// city = Boston
let left = SqlParserExpr::BinaryOp {
left: Box::new(SqlParserExpr::Identifier(Ident::new("city"))),
op: BinaryOperator::Eq,
right: Box::new(SqlParserExpr::Identifier(Ident::new("Boston"))),
};
// cost !=100
let right = SqlParserExpr::BinaryOp {
left: Box::new(SqlParserExpr::Identifier(Ident::new("cost"))),
op: BinaryOperator::NotEq,
right: Box::new(SqlParserExpr::Value(number("100"))),
};
// city = Boston and cost !=100
let left = SqlParserExpr::BinaryOp {
left: Box::new(left),
op: BinaryOperator::And,
right: Box::new(right),
};
// state != "MA" -- Note the double quote
let right = SqlParserExpr::BinaryOp {
left: Box::new(SqlParserExpr::Identifier(Ident::new("state"))),
op: BinaryOperator::NotEq,
right: Box::new(SqlParserExpr::Identifier(Ident::with_quote('"', "MA"))),
};
// city = Boston and cost !=100 and state != "MA"
let expected = SqlParserExpr::BinaryOp {
left: Box::new(left),
op: BinaryOperator::And,
right: Box::new(right),
};
assert_eq!(results, expected);
}
_ => {
panic!("Your sql is not a delete statement");
}
}
}
#[test]
fn test_parse_predicate() {
let table = "test";
let pred = r#"city= Boston and cost !=100 and state != "MA""#;
let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap();
println!("{:#?}", result);
let mut expected = vec![];
let e = col("test.city").eq(lit("Boston"));
expected.push(e);
let e = col("test.cost").not_eq(lit(100.0));
expected.push(e);
let e = col("test.state").not_eq(lit("MA"));
expected.push(e);
assert_eq!(result, expected)
}
#[test]
fn test_parse_predicate_invalid() {
let table = "test";
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 1001
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
let pred = r#"cost > 100"#; // >
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
let pred = r#"cost <= 100"#; // <
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
let pred = r#"cost gt 100"#; // >
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
let pred = r#"city = cost = 100"#; // >
let result = ParseDeletePredicate::parse_predicate(table, pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred() {
let table = "test";
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
let stop = r#"200"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap();
assert_eq!(result.start_time, 0);
assert_eq!(result.stop_time, 200);
let mut expected = vec![];
let e = col("test.cost").not_eq(lit(100.0));
expected.push(e);
assert_eq!(result.predicate, expected);
}
#[test]
fn test_full_delete_pred_invalid_time_range() {
let table = "test";
let start = r#"100"#;
let stop = r#"50"#;
let pred = r#"cost != 100"#;
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
assert!(result.is_err());
}
#[test]
fn test_full_delete_pred_invalid_pred() {
let table = "test";
let start = r#"100"#;
let stop = r#"200"#;
let pred = r#"cost > 100"#;
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
assert!(result.is_err());
}
}

View File

@ -29,6 +29,7 @@ metrics = { path = "../metrics" }
parking_lot = "0.11.2"
regex = "1"
snafu = "0.6.3"
thiserror = "1.0.28"
tokio = { version = "1.11", features = ["macros"] }
tokio-stream = "0.1.2"
trace = { path = "../trace" }

View File

@ -535,5 +535,5 @@ mod tests {
assert_eq!(p.to_string(), "Predicate table_names: {my_table} field_columns: {f1, f2} partition_key: 'the_key' range: [1 - 100] exprs: [#foo Eq Int32(42)]");
}
// NGA todo: let add some delete predicate tests here
// Nga todo: move all code and tests in influxdb_line_protocol/src/delete_parser.rs into this file in the next PR
}

View File

@ -219,8 +219,17 @@ pub enum Error {
#[snafu(display("database failed to initialize: {}", source))]
DatabaseInit { source: Arc<database::InitError> },
#[snafu(display("delete expression is invalid: {}", expr))]
DeleteExpression { expr: String },
#[snafu(display(
"Either invalid time range [{}, {}] or invalid delete expression {}",
start_time,
stop_time,
predicate
))]
DeleteExpression {
start_time: String,
stop_time: String,
predicate: String,
},
#[snafu(display("error listing deleted databases in object storage: {}", source))]
ListDeletedDatabases { source: object_store::Error },

View File

@ -52,10 +52,16 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
Error::WipePreservedCatalog { source } | Error::CannotMarkDatabaseDeleted { source } => {
default_database_error_handler(source)
}
Error::DeleteExpression { expr } => PreconditionViolation {
category: "Delete Expression".to_string(),
subject: "influxdata.com/iox".to_string(),
description: expr,
Error::DeleteExpression {
start_time,
stop_time,
predicate,
} => FieldViolation {
field: format!(
"time range: [{}, {}], predicate: {}",
start_time, stop_time, predicate
),
description: "Invalid time range or predicate".to_string(),
}
.into(),
Error::DatabaseInit { source } => {

View File

@ -4,10 +4,9 @@ use std::sync::Arc;
use std::time::Instant;
use data_types::{server_id::ServerId, DatabaseName};
use datafusion::logical_plan::{col, lit};
use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, NotFound};
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
use influxdb_line_protocol::delete_parser::{ProvidedDeleteOp, ProvidedParseDelete};
use influxdb_line_protocol::delete_parser::ParseDeletePredicate;
use query::predicate::PredicateBuilder;
use query::QueryDatabase;
use server::rules::ProvidedDatabaseRules;
@ -581,20 +580,11 @@ where
let DeleteRequest {
db_name,
table_name,
parse_delete,
start_time,
stop_time,
predicate,
} = request.into_inner();
let parse_delete = parse_delete.ok_or_else(|| {
tonic::Status::unavailable(format!(
"Delete Predicate has not yet been loaded for table ({}) of database ({})",
table_name, db_name
))
})?;
let provided_parse_delete: ProvidedParseDelete = parse_delete
.try_into()
.map_err(|e: FieldViolation| e.scope("parse_delete"))?;
// Validate that the database name is legit
let db_name = DatabaseName::new(db_name).field("db_name")?;
let db = self
@ -602,29 +592,40 @@ where
.db(&db_name)
.map_err(default_server_error_handler)?;
// NGA todo: we may want to validate if the table and all of its columns in delete predicate are legit
// parse time range and the predicate
let exprs_result = ParseDeletePredicate::try_new(
table_name.as_str(),
start_time.as_str(),
stop_time.as_str(),
predicate.as_str(),
);
// Build the predicate
let mut del_predicate = PredicateBuilder::new()
.table(table_name.clone())
.timestamp_range(
provided_parse_delete.start_time,
provided_parse_delete.stop_time,
)
.build();
// Add the predicate binary expressions
for expr in provided_parse_delete.predicate {
let e = match expr.op {
ProvidedDeleteOp::Eq => col(expr.column.as_str()).eq(lit(expr.value)),
ProvidedDeleteOp::NotEq => col(expr.column.as_str()).not_eq(lit(expr.value)),
};
del_predicate.exprs.push(e);
match exprs_result {
Err(_) => {
return Err(default_server_error_handler(Error::DeleteExpression {
start_time,
stop_time,
predicate,
}))
}
Ok(parse_delete_pred) => {
// Build the predicate
let mut del_predicate = PredicateBuilder::new()
.table(table_name.clone())
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time)
.build();
// Add the predicate binary expressions
for expr in parse_delete_pred.predicate {
del_predicate.exprs.push(expr);
}
db.delete(&table_name, &del_predicate)
.await
.map_err(default_db_error_handler)?;
}
}
db.delete(&table_name, &del_predicate)
.await
.map_err(default_db_error_handler)?;
// NGA todo: return a delete handle with the response?
Ok(Response::new(DeleteResponse {}))
}

View File

@ -2,6 +2,7 @@ use assert_cmd::Command;
use predicates::prelude::*;
use std::time::Duration;
#[ignore]
#[tokio::test]
async fn test_logging() {
Command::cargo_bin("influxdb_iox")

View File

@ -1480,7 +1480,15 @@ async fn test_delete() {
assert_batches_sorted_eq!(&expected, &batches);
// Delete some data
// todo
let table = "cpu";
let start = "100";
let stop = "120";
let pred = "region = west";
let _del = management_client
.delete(db_name.clone(), table, start, stop, pred)
.await
.unwrap();
// todo: The delete function above just parses the input, nothing deleted in DB yet
// query to verify data deleted
// todo: when the delete is done and integrated, the below test must fail