refactor: address review comments
parent
bd22c73b8a
commit
154dd4460e
|
@ -1740,7 +1740,6 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"nom 7.0.0",
|
||||
"observability_deps",
|
||||
"serde_json",
|
||||
"smallvec",
|
||||
"snafu",
|
||||
"test_helpers",
|
||||
|
@ -2967,6 +2966,7 @@ dependencies = [
|
|||
"internal_types",
|
||||
"observability_deps",
|
||||
"regex",
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"sqlparser",
|
||||
"test_helpers",
|
||||
|
|
|
@ -6,7 +6,6 @@ edition = "2018"
|
|||
|
||||
[dependencies] # In alphabetical order
|
||||
nom = "7"
|
||||
serde_json = "1.0.67"
|
||||
smallvec = "1.2.0"
|
||||
snafu = "0.6.2"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
|
|
|
@ -37,8 +37,6 @@ use std::{
|
|||
ops::Deref,
|
||||
};
|
||||
|
||||
const FLUX_TABLE: &str = "_measurement";
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display(r#"Must not contain duplicate tags, but "{}" was repeated"#, tag_key))]
|
||||
|
@ -71,21 +69,6 @@ pub enum Error {
|
|||
value: String,
|
||||
},
|
||||
|
||||
#[snafu(display(r#"Unable to parse delete string '{}'"#, value))]
|
||||
DeleteInvalid {
|
||||
source: serde_json::Error,
|
||||
value: String,
|
||||
},
|
||||
|
||||
#[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))]
|
||||
DeleteObjectInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Invalid table name in delete '{}'"#, value))]
|
||||
DeleteTableInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))]
|
||||
DeleteStartStopInvalid { value: String },
|
||||
|
||||
// This error is for compatibility with the Go parser
|
||||
#[snafu(display(
|
||||
r#"Measurements, tag keys and values, and field keys may not end with a backslash"#
|
||||
|
@ -1071,103 +1054,6 @@ fn escape_and_write_value(
|
|||
f.write_str(&value[last..])
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Clone)]
|
||||
/// data of a parsed delete
|
||||
pub struct ParsedDelete {
|
||||
/// Empty string, "", if no table specified
|
||||
pub table_name: String,
|
||||
pub start_time: String,
|
||||
pub stop_time: String,
|
||||
pub predicate: String,
|
||||
}
|
||||
|
||||
/// Return parsed data of an influx delete
|
||||
/// A few input examples and their parsed results:
|
||||
/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
|
||||
/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\"""
|
||||
/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
|
||||
/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50"
|
||||
pub fn parse_delete(input: &str) -> Result<ParsedDelete> {
|
||||
let parsed_obj: serde_json::Value =
|
||||
serde_json::from_str(input).context(DeleteInvalid { value: input })?;
|
||||
let mut parsed_delete = ParsedDelete::default();
|
||||
|
||||
if let serde_json::Value::Object(items) = parsed_obj {
|
||||
for item in items {
|
||||
// The value must be type String
|
||||
if let Some(val) = item.1.as_str() {
|
||||
match item.0.to_lowercase().as_str() {
|
||||
"start" => parsed_delete.start_time = val.to_string(),
|
||||
"stop" => parsed_delete.stop_time = val.to_string(),
|
||||
"predicate" => parsed_delete.predicate = val.to_string(),
|
||||
_ => {
|
||||
return Err(Error::DeleteObjectInvalid {
|
||||
value: input.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::DeleteObjectInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::DeleteObjectInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Start or stop is empty
|
||||
if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() {
|
||||
return Err(Error::DeleteStartStopInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Extract table from the predicate if any
|
||||
if parsed_delete.predicate.contains(FLUX_TABLE) {
|
||||
// since predicate is a conjunctive expression, split them by "and"
|
||||
let predicate = parsed_delete
|
||||
.predicate
|
||||
.replace(" AND ", " and ")
|
||||
.replace(" ANd ", " and ")
|
||||
.replace(" And ", " and ")
|
||||
.replace(" AnD ", " and ");
|
||||
|
||||
let split: Vec<&str> = predicate.split("and").collect();
|
||||
|
||||
let mut predicate_no_table = "".to_string();
|
||||
for s in split {
|
||||
if s.contains(FLUX_TABLE) {
|
||||
// This should be in form "_measurement = <your_table_name>"
|
||||
// only <keep your_table_name> by replacing the rest with ""
|
||||
let table_name = s
|
||||
.replace(FLUX_TABLE, "")
|
||||
.replace("=", "")
|
||||
.trim()
|
||||
.to_string();
|
||||
// Do not support white spaces in table name
|
||||
if table_name.contains(' ') {
|
||||
return Err(Error::DeleteTableInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
parsed_delete.table_name = table_name;
|
||||
} else {
|
||||
// This is a normal column comparison, put it back to send to sqlparser later
|
||||
if !predicate_no_table.is_empty() {
|
||||
predicate_no_table.push_str(" and ")
|
||||
}
|
||||
predicate_no_table.push_str(s.trim());
|
||||
}
|
||||
}
|
||||
parsed_delete.predicate = predicate_no_table;
|
||||
}
|
||||
|
||||
Ok(parsed_delete)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -2331,83 +2217,4 @@ her"#,
|
|||
|
||||
assert_eq!(vals[0].tag_value("asdf"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_full() {
|
||||
let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "mytable".to_string(),
|
||||
predicate: "host=\"Orient.local\"".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_no_table() {
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "host=\"Orient.local\"".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_empty_predicate() {
|
||||
let delete_str =
|
||||
r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_no_predicate() {
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_negative() {
|
||||
// invalid key
|
||||
let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
assert!(result.is_err());
|
||||
|
||||
// invalid timestamp value
|
||||
let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
assert!(result.is_err());
|
||||
|
||||
// invalid JSON
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ generated_types = { path = "../generated_types" }
|
|||
internal_types = { path = "../internal_types" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
regex = "1"
|
||||
serde_json = "1.0.67"
|
||||
snafu = "0.6.9"
|
||||
sqlparser = "0.11.0"
|
||||
|
||||
|
|
|
@ -23,10 +23,12 @@ use sqlparser::{
|
|||
parser::Parser,
|
||||
};
|
||||
|
||||
use snafu::Snafu;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use chrono::DateTime;
|
||||
|
||||
const FLUX_TABLE: &str = "_measurement";
|
||||
|
||||
// Parse Delete Predicates
|
||||
/// Parse Error
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -50,6 +52,30 @@ pub enum Error {
|
|||
/// Predicate include non supported expression
|
||||
#[snafu(display("Delete predicate must be conjunctive expressions of binary 'column_name = literal' or 'column_ame != literal': ({})", value))]
|
||||
NotSupportPredicate { value: String },
|
||||
|
||||
#[snafu(display(r#"Unable to parse delete string '{}'"#, value))]
|
||||
DeleteInvalid {
|
||||
source: serde_json::Error,
|
||||
value: String,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
r#"Invalid key which is either 'start', 'stop', or 'predicate': '{}'"#,
|
||||
value
|
||||
))]
|
||||
DeleteKeywordInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Invalid timestamp or predicate value: '{}'"#, value))]
|
||||
DeleteValueInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Invalid JSON format of delete string '{}'"#, value))]
|
||||
DeleteObjectInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Invalid table name in delete '{}'"#, value))]
|
||||
DeleteTableInvalid { value: String },
|
||||
|
||||
#[snafu(display(r#"Delete must include a start time and a stop time'{}'"#, value))]
|
||||
DeleteStartStopInvalid { value: String },
|
||||
}
|
||||
|
||||
/// Result type for Parser Cient
|
||||
|
@ -526,12 +552,12 @@ impl ParseDeletePredicate {
|
|||
}
|
||||
|
||||
/// Parse and convert the delete grpc API into ParseDeletePredicate to send to server
|
||||
pub fn try_new(table_name: &str, start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
pub fn try_new(start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
|
||||
|
||||
// Parse the predicate
|
||||
let delete_exprs = Self::parse_predicate(table_name, predicate)?;
|
||||
let delete_exprs = Self::parse_predicate(predicate)?;
|
||||
|
||||
Ok(Self::new(start_time, stop_time, delete_exprs))
|
||||
}
|
||||
|
@ -540,22 +566,14 @@ impl ParseDeletePredicate {
|
|||
/// A delete predicate is a conjunctive expression of many
|
||||
/// binary expressions of 'colum = constant' or 'column != constant'
|
||||
///
|
||||
pub fn parse_predicate(table_name: &str, predicate: &str) -> Result<Vec<Expr>> {
|
||||
pub fn parse_predicate(predicate: &str) -> Result<Vec<Expr>> {
|
||||
if predicate.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Since table-name can be empty, let assign it a random name to have sqlparser work
|
||||
let mut table = "some_table";
|
||||
if !table_name.is_empty() {
|
||||
table = table_name;
|
||||
}
|
||||
|
||||
// Now add this predicate string into a DELETE SQL to user sqlparser to parse it
|
||||
// "DELETE FROM table_name WHERE predicate"
|
||||
let mut sql = "DELETE FROM ".to_string();
|
||||
sql.push_str(table);
|
||||
sql.push_str(" WHERE ");
|
||||
// Table name can be anything to have sqlparser work on the right sql syntax
|
||||
let mut sql = "DELETE FROM table_name WHERE ".to_string();
|
||||
sql.push_str(predicate);
|
||||
|
||||
// parse the delete sql
|
||||
|
@ -582,7 +600,7 @@ impl ParseDeletePredicate {
|
|||
}) => {
|
||||
// split this expr into smaller binary if any
|
||||
let mut exprs = vec![];
|
||||
let split = Self::split_members(table_name, &expr, &mut exprs);
|
||||
let split = Self::split_members(&expr, &mut exprs);
|
||||
if !split {
|
||||
return Err(Error::NotSupportPredicate {
|
||||
value: predicate.to_string(),
|
||||
|
@ -599,21 +617,18 @@ impl ParseDeletePredicate {
|
|||
}
|
||||
|
||||
pub fn build_delete_predicate(
|
||||
table_name: String,
|
||||
start_time: String,
|
||||
stop_time: String,
|
||||
predicate: String,
|
||||
) -> Result<Predicate, Error> {
|
||||
// parse time range and the predicate
|
||||
let parse_delete_pred = ParseDeletePredicate::try_new(
|
||||
table_name.as_str(),
|
||||
start_time.as_str(),
|
||||
stop_time.as_str(),
|
||||
predicate.as_str(),
|
||||
)?;
|
||||
|
||||
let mut del_predicate = PredicateBuilder::new()
|
||||
.table(table_name)
|
||||
.timestamp_range(parse_delete_pred.start_time, parse_delete_pred.stop_time)
|
||||
.build();
|
||||
|
||||
|
@ -631,11 +646,7 @@ impl ParseDeletePredicate {
|
|||
/// "column_name = literal" or "column_name != literal"
|
||||
///
|
||||
/// The split expressions will be converted into data fusion expressions
|
||||
pub fn split_members(
|
||||
table_name: &str,
|
||||
predicate: &SqlParserExpr,
|
||||
predicates: &mut Vec<Expr>,
|
||||
) -> bool {
|
||||
pub fn split_members(predicate: &SqlParserExpr, predicates: &mut Vec<Expr>) -> bool {
|
||||
// The below code built to be compatible with
|
||||
// https://github.com/influxdata/influxdb/blob/master/predicate/parser_test.go
|
||||
match predicate {
|
||||
|
@ -644,10 +655,10 @@ impl ParseDeletePredicate {
|
|||
op: BinaryOperator::And,
|
||||
right,
|
||||
} => {
|
||||
if !Self::split_members(table_name, left, predicates) {
|
||||
if !Self::split_members(left, predicates) {
|
||||
return false;
|
||||
}
|
||||
if !Self::split_members(table_name, right, predicates) {
|
||||
if !Self::split_members(right, predicates) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -745,6 +756,108 @@ impl ParseDeletePredicate {
|
|||
}
|
||||
}
|
||||
|
||||
// Note that this struct and its functions are used to parse FLUX DELETE,
|
||||
// https://docs.influxdata.com/influxdb/v2.0/write-data/delete-data/, which happens before
|
||||
// the parsing of timestamps and sql predicate. The examples below will show FLUX DELETE's syntax which is
|
||||
// different from SQL syntax so we need this extra parsing step before invoking sqlparser to parse the
|
||||
// sql-format predicates and timestamps
|
||||
#[derive(Debug, Default, PartialEq, Clone)]
|
||||
/// data of a parsed delete
|
||||
pub struct ParsedDelete {
|
||||
/// Empty string, "", if no table specified
|
||||
pub table_name: String,
|
||||
pub start_time: String,
|
||||
pub stop_time: String,
|
||||
pub predicate: String,
|
||||
}
|
||||
|
||||
/// Return parsed data of an influx delete:
|
||||
/// A few input examples and their parsed results:
|
||||
/// {"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
|
||||
/// => table_name="mytable", start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=\"Orient.local\"""
|
||||
/// {"predicate":"host=Orient.local and val != 50","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}
|
||||
/// => start_time="1970-01-01T00:00:00Z", end_time="2070-01-02T00:00:00Z", predicate="host=Orient.local and val != 50"
|
||||
pub fn parse_delete(input: &str) -> Result<ParsedDelete> {
|
||||
let parsed_obj: serde_json::Value =
|
||||
serde_json::from_str(input).context(DeleteInvalid { value: input })?;
|
||||
let mut parsed_delete = ParsedDelete::default();
|
||||
|
||||
if let serde_json::Value::Object(items) = parsed_obj {
|
||||
for item in items {
|
||||
// The value must be type String
|
||||
if let Some(val) = item.1.as_str() {
|
||||
match item.0.to_lowercase().as_str() {
|
||||
"start" => parsed_delete.start_time = val.to_string(),
|
||||
"stop" => parsed_delete.stop_time = val.to_string(),
|
||||
"predicate" => parsed_delete.predicate = val.to_string(),
|
||||
_ => {
|
||||
return Err(Error::DeleteKeywordInvalid {
|
||||
value: input.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::DeleteValueInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(Error::DeleteObjectInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Start or stop is empty
|
||||
if parsed_delete.start_time.is_empty() || parsed_delete.stop_time.is_empty() {
|
||||
return Err(Error::DeleteStartStopInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
// Extract table from the predicate if any
|
||||
if parsed_delete.predicate.contains(FLUX_TABLE) {
|
||||
// since predicate is a conjunctive expression, split them by "and"
|
||||
let predicate = parsed_delete
|
||||
.predicate
|
||||
.replace(" AND ", " and ")
|
||||
.replace(" ANd ", " and ")
|
||||
.replace(" And ", " and ")
|
||||
.replace(" AnD ", " and ");
|
||||
|
||||
let split: Vec<&str> = predicate.split("and").collect();
|
||||
|
||||
let mut predicate_no_table = "".to_string();
|
||||
for s in split {
|
||||
if s.contains(FLUX_TABLE) {
|
||||
// This should be in form "_measurement = <your_table_name>"
|
||||
// only <keep your_table_name> by replacing the rest with ""
|
||||
let table_name = s
|
||||
.replace(FLUX_TABLE, "")
|
||||
.replace("=", "")
|
||||
.trim()
|
||||
.to_string();
|
||||
// Do not support white spaces in table name
|
||||
if table_name.contains(' ') {
|
||||
return Err(Error::DeleteTableInvalid {
|
||||
value: input.to_string(),
|
||||
});
|
||||
}
|
||||
parsed_delete.table_name = table_name;
|
||||
} else {
|
||||
// This is a normal column comparison, put it back to send to sqlparser later
|
||||
if !predicate_no_table.is_empty() {
|
||||
predicate_no_table.push_str(" and ")
|
||||
}
|
||||
predicate_no_table.push_str(s.trim());
|
||||
}
|
||||
}
|
||||
parsed_delete.predicate = predicate_no_table;
|
||||
}
|
||||
|
||||
Ok(parsed_delete)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -995,10 +1108,8 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_predicate() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston and cost !=100 and state != "MA" AND temp=87.5"#;
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred).unwrap();
|
||||
let result = ParseDeletePredicate::parse_predicate(pred).unwrap();
|
||||
|
||||
println!("{:#?}", result);
|
||||
|
||||
|
@ -1018,41 +1129,38 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_parse_predicate_invalid() {
|
||||
let table = "test";
|
||||
|
||||
let pred = r#"city= Boston Or cost !=100 and state != "MA""#; // OR
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city= Boston and cost !=100+1 and state != "MA""#; // 100 + 1
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost > 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost <= 100"#; // <
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"cost gt 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
|
||||
let pred = r#"city = cost = 100"#; // >
|
||||
let result = ParseDeletePredicate::parse_predicate(table, pred);
|
||||
let result = ParseDeletePredicate::parse_predicate(pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred() {
|
||||
let table = "test";
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred).unwrap();
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred).unwrap();
|
||||
assert_eq!(result.start_time, 0);
|
||||
assert_eq!(result.stop_time, 200);
|
||||
|
||||
|
@ -1065,23 +1173,108 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_time_range() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#;
|
||||
let pred = r#"cost != 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_full_delete_pred_invalid_pred() {
|
||||
let table = "test";
|
||||
let start = r#"100"#;
|
||||
let stop = r#"200"#;
|
||||
let pred = r#"cost > 100"#;
|
||||
|
||||
let result = ParseDeletePredicate::try_new(table, start, stop, pred);
|
||||
let result = ParseDeletePredicate::try_new(start, stop, pred);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_full() {
|
||||
let delete_str = r#"{"predicate":"_measurement=mytable AND host=\"Orient.local\"","start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "mytable".to_string(),
|
||||
predicate: "host=\"Orient.local\"".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_no_table() {
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z", "predicate":"host=\"Orient.local\""}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "host=\"Orient.local\"".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_empty_predicate() {
|
||||
let delete_str =
|
||||
r#"{"start":"1970-01-01T00:00:00Z","predicate":"","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_no_predicate() {
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
|
||||
let expected = ParsedDelete {
|
||||
table_name: "".to_string(),
|
||||
predicate: "".to_string(),
|
||||
start_time: "1970-01-01T00:00:00Z".to_string(),
|
||||
stop_time: "2070-01-02T00:00:00Z".to_string(),
|
||||
};
|
||||
|
||||
let result = parse_delete(delete_str).unwrap();
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
// NGA todo: check content of error messages
|
||||
#[test]
|
||||
fn test_parse_delete_negative() {
|
||||
// invalid key
|
||||
let delete_str = r#"{"invalid":"1970-01-01T00:00:00Z","stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
let err = result.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("Invalid key which is either 'start', 'stop', or 'predicate'"));
|
||||
|
||||
// invalid timestamp value
|
||||
let delete_str = r#"{"start":123,"stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
let err = result.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("Invalid timestamp or predicate value"));
|
||||
|
||||
// invalid JSON
|
||||
let delete_str = r#"{"start":"1970-01-01T00:00:00Z",;"stop":"2070-01-02T00:00:00Z"}"#;
|
||||
let result = parse_delete(delete_str);
|
||||
let err = result.unwrap_err();
|
||||
assert!(err.to_string().contains("Unable to parse delete string"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -317,7 +317,7 @@ mod tests {
|
|||
let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#;
|
||||
|
||||
let parse_delete_pred =
|
||||
ParseDeletePredicate::try_new(table_name, start_time, stop_time, predicate).unwrap();
|
||||
ParseDeletePredicate::try_new(start_time, stop_time, predicate).unwrap();
|
||||
|
||||
let mut del_predicate_builder = PredicateBuilder::new()
|
||||
.table(table_name)
|
||||
|
|
|
@ -25,8 +25,8 @@ use data_types::{
|
|||
DatabaseName,
|
||||
};
|
||||
use influxdb_iox_client::format::QueryOutputFormat;
|
||||
use influxdb_line_protocol::{parse_delete, parse_lines};
|
||||
use predicate::predicate::ParseDeletePredicate;
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use predicate::predicate::{parse_delete, ParseDeletePredicate};
|
||||
use query::exec::ExecutionContextProvider;
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server as AppServer};
|
||||
|
||||
|
@ -163,7 +163,7 @@ pub enum ApplicationError {
|
|||
|
||||
#[snafu(display("Error parsing delete {}: {}", input, source))]
|
||||
ParsingDelete {
|
||||
source: influxdb_line_protocol::Error,
|
||||
source: predicate::predicate::Error,
|
||||
input: String,
|
||||
},
|
||||
|
||||
|
@ -603,11 +603,13 @@ where
|
|||
let db = server.db(&db_name)?;
|
||||
|
||||
// Build delete predicate
|
||||
let del_predicate =
|
||||
ParseDeletePredicate::build_delete_predicate(table_name.clone(), start, stop, predicate)
|
||||
.context(BuildingDeletePredicate { input: body })?;
|
||||
let del_predicate = ParseDeletePredicate::build_delete_predicate(start, stop, predicate)
|
||||
.context(BuildingDeletePredicate { input: body })?;
|
||||
|
||||
// Tables data will be deleted from
|
||||
// Note for developer: this the only place we support INFLUX DELETE that deletes
|
||||
// data from many tables in one command. If you want to use general delete API to
|
||||
// delete data from a specified table, use the one in the management API (src/influxdb_ioxd/rpc/management.rs) instead
|
||||
let mut tables = vec![];
|
||||
if table_name.is_empty() {
|
||||
tables = db.table_names();
|
||||
|
|
|
@ -624,7 +624,6 @@ where
|
|||
.map_err(default_server_error_handler)?;
|
||||
|
||||
let del_predicate_result = ParseDeletePredicate::build_delete_predicate(
|
||||
table_name.clone(),
|
||||
start_time.clone(),
|
||||
stop_time.clone(),
|
||||
predicate.clone(),
|
||||
|
|
Loading…
Reference in New Issue