refactor: cleanup
parent
4801b2c238
commit
de0bd80c3d
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::chunk_metadata::ChunkAddr;
|
||||
use crate::partition_metadata::{DeleteInfo, PartitionAddr};
|
||||
use crate::partition_metadata::PartitionAddr;
|
||||
|
||||
/// Metadata associated with a set of background tasks
|
||||
/// Used in combination with TrackerRegistry
|
||||
|
@ -42,9 +42,6 @@ pub enum Job {
|
|||
|
||||
/// Wipe preserved catalog
|
||||
WipePreservedCatalog { db_name: Arc<str> },
|
||||
|
||||
/// Delete data from a table
|
||||
Delete { delete_info: DeleteInfo },
|
||||
}
|
||||
|
||||
impl Job {
|
||||
|
@ -59,7 +56,6 @@ impl Job {
|
|||
Self::DropChunk { chunk, .. } => Some(&chunk.db_name),
|
||||
Self::DropPartition { partition, .. } => Some(&partition.db_name),
|
||||
Self::WipePreservedCatalog { db_name, .. } => Some(db_name),
|
||||
Self::Delete { delete_info, .. } => Some(&delete_info.db_name),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,7 +70,6 @@ impl Job {
|
|||
Self::DropChunk { chunk, .. } => Some(&chunk.partition_key),
|
||||
Self::DropPartition { partition, .. } => Some(&partition.partition_key),
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
Self::Delete { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,7 +84,6 @@ impl Job {
|
|||
Self::DropChunk { chunk, .. } => Some(&chunk.table_name),
|
||||
Self::DropPartition { partition, .. } => Some(&partition.table_name),
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
Self::Delete { delete_info, .. } => Some(&delete_info.table_name),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -104,7 +98,6 @@ impl Job {
|
|||
Self::DropChunk { chunk, .. } => Some(vec![chunk.chunk_id]),
|
||||
Self::DropPartition { .. } => None,
|
||||
Self::WipePreservedCatalog { .. } => None,
|
||||
Self::Delete { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,7 +114,6 @@ impl Job {
|
|||
"Drop partition from memory and (if persisted) from object store"
|
||||
}
|
||||
Self::WipePreservedCatalog { .. } => "Wipe preserved catalog",
|
||||
Self::Delete { .. } => "Delete data from table",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,38 +163,6 @@ impl TableSummary {
|
|||
}
|
||||
}
|
||||
|
||||
/// Delete information
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct DeleteInfo {
|
||||
/// Database name
|
||||
pub db_name: Arc<str>,
|
||||
|
||||
/// Table with data to delete
|
||||
pub table_name: Arc<str>,
|
||||
|
||||
/// Delete Predicate
|
||||
// Ideally, this can be any complicated expressions that DataFusion supports
|
||||
// but in our first version, we only support what our read buffer does which is
|
||||
// conjunctive expressions with columns being compared to literals using = or != operators.
|
||||
pub delete_predicate: Arc<str>,
|
||||
|
||||
/// Start time range of deleting data
|
||||
pub start_time: Arc<str>,
|
||||
|
||||
/// Stop time range of deleting data
|
||||
pub stop_time: Arc<str>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DeleteInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Delete('{}':'{}':'{}')",
|
||||
self.db_name, self.table_name, self.delete_predicate
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Replicate this enum here as it can't be derived from the existing statistics
|
||||
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
|
||||
pub enum InfluxDbType {
|
||||
|
|
|
@ -40,7 +40,6 @@ message OperationMetadata {
|
|||
PersistChunks persist_chunks = 11;
|
||||
DropChunk drop_chunk = 12;
|
||||
DropPartition drop_partition = 17;
|
||||
Delete delete = 18;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -433,24 +433,10 @@ message DeleteRequest {
|
|||
// table name
|
||||
string table_name = 2;
|
||||
|
||||
// a struct of delete info defined in ParseDelete
|
||||
// a delete info: time range and predicate of other binary expressions
|
||||
ParseDelete parse_delete = 3;
|
||||
|
||||
// // delete predicate
|
||||
// // Ideally, this can be any complicated expressions that DataFusion supports
|
||||
// // but in our first version, we only support what our read buffer does which is
|
||||
// // conjunctive expressions with columns being compared to literals using = or != operators.
|
||||
// // Also, to avoid user making mistake deleting the whole table, we will force them to
|
||||
// // inlcude delete time range start and stop in different fields defined below
|
||||
// string delete_predicate = 3;
|
||||
|
||||
// // start time range of deleting data
|
||||
// string start_time = 4;
|
||||
|
||||
// // stop time range of deleting data
|
||||
// string stop_time = 5;
|
||||
}
|
||||
|
||||
message DeleteResponse {
|
||||
// NGA todo: response something?
|
||||
// NGA todo: define an appropriate response
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::influxdata::iox::management::v1 as management;
|
|||
use crate::protobuf_type_url_eq;
|
||||
use data_types::chunk_metadata::ChunkAddr;
|
||||
use data_types::job::{Job, OperationStatus};
|
||||
use data_types::partition_metadata::{DeleteInfo, PartitionAddr};
|
||||
use data_types::partition_metadata::PartitionAddr;
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -58,13 +58,6 @@ impl From<Job> for management::operation_metadata::Job {
|
|||
partition_key: partition.partition_key.to_string(),
|
||||
table_name: partition.table_name.to_string(),
|
||||
}),
|
||||
Job::Delete { delete_info } => Self::Delete(management::Delete {
|
||||
db_name: delete_info.db_name.to_string(),
|
||||
table_name: delete_info.table_name.to_string(),
|
||||
delete_predicate: delete_info.delete_predicate.to_string(),
|
||||
start_time: delete_info.start_time.to_string(),
|
||||
stop_time: delete_info.stop_time.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -158,21 +151,6 @@ impl From<management::operation_metadata::Job> for Job {
|
|||
partition_key: Arc::from(partition_key.as_str()),
|
||||
},
|
||||
},
|
||||
Job::Delete(management::Delete {
|
||||
db_name,
|
||||
table_name,
|
||||
delete_predicate,
|
||||
start_time,
|
||||
stop_time,
|
||||
}) => Self::Delete {
|
||||
delete_info: DeleteInfo {
|
||||
db_name: Arc::from(db_name.as_str()),
|
||||
table_name: Arc::from(table_name.as_str()),
|
||||
delete_predicate: Arc::from(delete_predicate.as_str()),
|
||||
start_time: Arc::from(start_time.as_str()),
|
||||
stop_time: Arc::from(stop_time.as_str()),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,10 +3,10 @@ use crate::{
|
|||
influxdata::iox::management::v1 as management,
|
||||
};
|
||||
|
||||
use influxdb_line_protocol::delete_parser::{ProvidedDeleteBinaryExpr, ProvidedDeleteOp, ProvidedParseDelete};
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
use influxdb_line_protocol::delete_parser::{
|
||||
ProvidedDeleteBinaryExpr, ProvidedDeleteOp, ProvidedParseDelete,
|
||||
};
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
|
||||
impl From<ProvidedDeleteOp> for management::DeleteOp {
|
||||
fn from(op: ProvidedDeleteOp) -> Self {
|
||||
|
@ -24,17 +24,14 @@ impl TryFrom<management::DeleteOp> for ProvidedDeleteOp {
|
|||
match proto {
|
||||
management::DeleteOp::Eq => Ok(Self::Eq),
|
||||
management::DeleteOp::NotEq => Ok(Self::NotEq),
|
||||
management::DeleteOp::Unspecified => Err(FieldViolation::required("")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProvidedDeleteBinaryExpr> for management::DeleteBinaryExpr {
|
||||
fn from(bin_expr: ProvidedDeleteBinaryExpr) -> Self {
|
||||
let ProvidedDeleteBinaryExpr {
|
||||
column,
|
||||
op,
|
||||
value,
|
||||
} = bin_expr;
|
||||
let ProvidedDeleteBinaryExpr { column, op, value } = bin_expr;
|
||||
|
||||
Self {
|
||||
column,
|
||||
|
@ -48,14 +45,9 @@ impl TryFrom<management::DeleteBinaryExpr> for ProvidedDeleteBinaryExpr {
|
|||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(proto: management::DeleteBinaryExpr) -> Result<Self, Self::Error> {
|
||||
let management::DeleteBinaryExpr { column, op, value } = proto;
|
||||
|
||||
let management::DeleteBinaryExpr {
|
||||
column,
|
||||
op,
|
||||
value
|
||||
} = proto;
|
||||
|
||||
Ok( Self {
|
||||
Ok(Self {
|
||||
column,
|
||||
op: management::DeleteOp::from_i32(op).required("op")?,
|
||||
value,
|
||||
|
@ -84,21 +76,18 @@ impl TryFrom<management::ParseDelete> for ProvidedParseDelete {
|
|||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(proto: management::ParseDelete) -> Result<Self, Self::Error> {
|
||||
|
||||
let management::ParseDelete {
|
||||
start_time,
|
||||
stop_time,
|
||||
exprs
|
||||
} = proto;
|
||||
exprs,
|
||||
} = proto;
|
||||
|
||||
let pred_result: Result<Vec<ProvidedDeleteBinaryExpr>, Self::Error> = exprs
|
||||
.into_iter()
|
||||
.map(TryInto::try_into)
|
||||
.collect();
|
||||
let pred_result: Result<Vec<ProvidedDeleteBinaryExpr>, Self::Error> =
|
||||
exprs.into_iter().map(TryInto::try_into).collect();
|
||||
|
||||
let pred = pred_result?;
|
||||
|
||||
Ok( Self {
|
||||
Ok(Self {
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate: pred,
|
||||
|
|
|
@ -13,4 +13,3 @@ pub mod operations;
|
|||
#[cfg(feature = "flight")]
|
||||
/// Client for query API (based on Arrow flight)
|
||||
pub mod flight;
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ impl PerformQuery {
|
|||
pub async fn to_batches(&mut self) -> Result<Vec<RecordBatch>, Error> {
|
||||
let mut batches = Vec::new();
|
||||
while let Some(data) = self.next().await? {
|
||||
batches.push(data);
|
||||
batches.push(data);
|
||||
}
|
||||
|
||||
Ok(batches)
|
||||
|
|
|
@ -3,7 +3,7 @@ use thiserror::Error;
|
|||
use self::generated_types::{management_service_client::ManagementServiceClient, *};
|
||||
|
||||
use crate::connection::Connection;
|
||||
use ::generated_types::{google::longrunning::Operation};
|
||||
use ::generated_types::google::longrunning::Operation;
|
||||
use influxdb_line_protocol::delete_parser::{self, ProvidedParseDelete};
|
||||
|
||||
use std::convert::TryInto;
|
||||
|
@ -362,10 +362,10 @@ pub enum DropPartitionError {
|
|||
#[derive(Debug, Error)]
|
||||
pub enum DeleteError {
|
||||
/// Invalid time range or predicate
|
||||
/// The error message is sent back from the original one which
|
||||
/// The error message is sent back from the original one which
|
||||
/// will include the detail
|
||||
#[error("Invalid input: {}", .0)]
|
||||
ParseErr(delete_parser::Error),
|
||||
ParseErr(delete_parser::Error),
|
||||
|
||||
/// Database not found
|
||||
#[error("Not found: {}", .0)]
|
||||
|
@ -963,20 +963,26 @@ impl Client {
|
|||
let stop_time = stop_time.into();
|
||||
|
||||
// parse the time range and predicate
|
||||
let parse_delete_result = ProvidedParseDelete::parse_delete(start_time.as_str(), stop_time.as_str(), predicate.as_str());
|
||||
let parse_delete_result = ProvidedParseDelete::parse_delete(
|
||||
start_time.as_str(),
|
||||
stop_time.as_str(),
|
||||
predicate.as_str(),
|
||||
);
|
||||
match parse_delete_result {
|
||||
Err(e) => return Err(DeleteError::ParseErr(e)),
|
||||
Ok(parse_delete) => {
|
||||
let mgm_parse_delete = Some(parse_delete.into());
|
||||
self.inner
|
||||
.delete ( DeleteRequest {
|
||||
.delete(DeleteRequest {
|
||||
db_name,
|
||||
table_name,
|
||||
parse_delete: mgm_parse_delete,
|
||||
})
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::NotFound => DeleteError::NotFound(status.message().to_string()),
|
||||
tonic::Code::NotFound => {
|
||||
DeleteError::NotFound(status.message().to_string())
|
||||
}
|
||||
tonic::Code::Unavailable => DeleteError::Unavailable(status),
|
||||
_ => DeleteError::ServerError(status),
|
||||
})?;
|
||||
|
|
|
@ -26,4 +26,3 @@ pub use client_util::connection;
|
|||
pub mod format;
|
||||
|
||||
mod client;
|
||||
|
||||
|
|
|
@ -22,11 +22,11 @@ use crate::timestamp;
|
|||
pub enum Error {
|
||||
/// Invalid time format
|
||||
#[error("Invalid timestamp: {}", .0)]
|
||||
InvalidTimestamp(String),
|
||||
InvalidTimestamp(String),
|
||||
|
||||
/// Invalid time range
|
||||
#[error("Invalid time range: ({}, {})", .0, .1)]
|
||||
InvalidTimeRange(String, String),
|
||||
/// Invalid time range
|
||||
#[error("Invalid time range: ({}, {})", .0, .1)]
|
||||
InvalidTimeRange(String, String),
|
||||
}
|
||||
|
||||
/// Result type for Parser Cient
|
||||
|
@ -41,7 +41,7 @@ pub struct ProvidedParseDelete {
|
|||
pub predicate: Vec<ProvidedDeleteBinaryExpr>,
|
||||
}
|
||||
|
||||
/// Single Binary expression of delete which
|
||||
/// Single Binary expression of delete which
|
||||
/// in the form of "column = value" or column != value"
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
|
||||
pub struct ProvidedDeleteBinaryExpr {
|
||||
|
@ -70,16 +70,17 @@ impl ProvidedDeleteOp {
|
|||
}
|
||||
|
||||
impl ProvidedParseDelete {
|
||||
|
||||
/// Create a ProvidedParseDelete
|
||||
pub fn new(start_time: i64, stop_time: i64, predicate: Vec<ProvidedDeleteBinaryExpr>) -> Self {
|
||||
Self {
|
||||
start_time, stop_time, predicate,
|
||||
start_time,
|
||||
stop_time,
|
||||
predicate,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse and convert the delete grpc API into ProvidedParseDelete to send to server
|
||||
pub fn parse_delete(start: &str, stop: &str, predicate: &str) -> Result<Self>{
|
||||
pub fn parse_delete(start: &str, stop: &str, predicate: &str) -> Result<Self> {
|
||||
// parse and check time range
|
||||
let (start_time, stop_time) = Self::parse_time_range(start, stop)?;
|
||||
|
||||
|
@ -94,7 +95,7 @@ impl ProvidedParseDelete {
|
|||
Ok(vec![])
|
||||
}
|
||||
|
||||
/// Parse a time and return its time in nanosecond
|
||||
/// Parse a time and return its time in nanosecond
|
||||
pub fn parse_time(input: &str) -> Result<i64> {
|
||||
// This input can be in timestamp form that end with Z such as 1970-01-01T00:00:00Z
|
||||
// See examples here https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/delete/#delete-all-points-within-a-specified-time-frame
|
||||
|
@ -113,7 +114,7 @@ impl ProvidedParseDelete {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a time range [start, stop]
|
||||
|
@ -121,14 +122,13 @@ impl ProvidedParseDelete {
|
|||
let start_time = Self::parse_time(start)?;
|
||||
let stop_time = Self::parse_time(stop)?;
|
||||
if start_time > stop_time {
|
||||
return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string()))
|
||||
return Err(Error::InvalidTimeRange(start.to_string(), stop.to_string()));
|
||||
}
|
||||
|
||||
|
||||
Ok((start_time, stop_time))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -147,12 +147,18 @@ mod test {
|
|||
let expected = (100, 200);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"1970-01-01T00:00:00Z"#;
|
||||
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
|
||||
let expected = (0, 0);
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// let start = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
// let stop = r#"now()"#; // -- Not working. Need to find a way to test this
|
||||
// let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
|
||||
// let expected = (0, 0);
|
||||
// assert_eq!(result, expected);
|
||||
|
||||
let start = r#"1970-01-01T00:00:00Z"#;
|
||||
let stop = r#"100"#;
|
||||
let result = ProvidedParseDelete::parse_time_range(start, stop).unwrap();
|
||||
|
@ -174,17 +180,17 @@ mod test {
|
|||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"50"#; // this is nano 0
|
||||
let stop = r#"50"#; // this is nano 0
|
||||
let result = ProvidedParseDelete::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"100"#;
|
||||
let stop = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let stop = r#"1970-01-01T00:00:00Z"#; // this is nano 0
|
||||
let result = ProvidedParseDelete::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
|
||||
let start = r#"1971-09-01T00:00:10Z"#;
|
||||
let stop = r#"1971-09-01T00:00:05Z"#; // this is nano 0
|
||||
let stop = r#"1971-09-01T00:00:05Z"#; // this is nano 0
|
||||
let result = ProvidedParseDelete::parse_time_range(start, stop);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
@ -242,5 +248,4 @@ mod test {
|
|||
let time = ProvidedParseDelete::parse_time(input);
|
||||
assert!(time.is_err());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ use nom::{
|
|||
combinator::{map, opt, recognize},
|
||||
multi::many0,
|
||||
sequence::{preceded, separated_pair, terminated, tuple},
|
||||
Parser,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use smallvec::SmallVec;
|
||||
|
@ -665,13 +664,6 @@ fn field_value(i: &str) -> IResult<&str, FieldValue<'_>> {
|
|||
alt((int, uint, float, string, boolv))(i)
|
||||
}
|
||||
|
||||
fn delete_predicate_field_key(i: &str) -> IResult<&str, EscapedStr<'_>> {
|
||||
let normal_char = take_while1(|c| {
|
||||
!is_whitespace_boundary_char(c) && c != '=' && c != '\\' && c != '!' && c != '<' && c != '>'
|
||||
});
|
||||
escaped_value(normal_char)(i)
|
||||
}
|
||||
|
||||
fn field_integer_value(i: &str) -> IResult<&str, i64> {
|
||||
let tagged_value = terminated(integral_value_signed, tag("i"));
|
||||
map_fail(tagged_value, |value| {
|
||||
|
@ -736,107 +728,6 @@ pub fn timestamp(i: &str) -> IResult<&str, i64> {
|
|||
})(i)
|
||||
}
|
||||
|
||||
/// Parse delete predicate which is in form of conjunctive expressions with
|
||||
/// columns being compared to literals using = or != operators.
|
||||
///
|
||||
/// Example: city = "Boston" AND building != 12 and month = 10
|
||||
///
|
||||
/// This function is currently used to support Delete's gPRC API only.
|
||||
/// In the future, when we support all kinds of delete predicates supported in
|
||||
/// Datafusion, we may want to reuse the parser used by Datafusion to keep
|
||||
/// things consistent
|
||||
pub fn parse_delete_predicate(input: &str) -> IResult<&str, Vec<BinaryExpr>> {
|
||||
// Or expression is invalid
|
||||
if input.contains(" or ")
|
||||
|| input.contains(" OR ")
|
||||
|| input.contains(" Or ")
|
||||
|| input.contains(" oR ")
|
||||
{
|
||||
return Err(nom::Err::Error(Error::OrInDeleteExpression {
|
||||
value: input.to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
// This is very silly and expensive
|
||||
let i = input.replace(" AND ", " and ");
|
||||
let i = i.replace(" And ", " and ");
|
||||
let i = i.replace(" AnD ", " and ");
|
||||
let i = i.replace(" aNd ", " and ");
|
||||
let i = i.replace(" aND ", " and ");
|
||||
let i = i.replace(" anD ", " and ");
|
||||
|
||||
// Convert everything after "and" into an expression string
|
||||
let expr_strings: Vec<&str> = i.split(" and ").collect();
|
||||
|
||||
let mut exprs: Vec<BinaryExpr> = vec![];
|
||||
for expr_str in expr_strings {
|
||||
let expr = parse_delete_expression(expr_str)?;
|
||||
exprs.push(expr.1);
|
||||
}
|
||||
Ok((input, exprs))
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct BinaryExpr {
|
||||
pub column_name: String,
|
||||
pub op: String, // either "=" or "!="
|
||||
pub literal: String, // constant
|
||||
}
|
||||
|
||||
impl BinaryExpr {
|
||||
pub fn new(col: &str, o: &str, lit: &str) -> Self {
|
||||
Self {
|
||||
column_name: col.to_string(),
|
||||
op: o.to_string(),
|
||||
literal: lit.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a single binary expression of a column being compared to literals using = or != operators
|
||||
///
|
||||
/// Note that we currently not allow white spaces in the middle of the expression
|
||||
///
|
||||
/// Examples of valid expressions:
|
||||
/// " col=1000 "
|
||||
/// r#"col!="abc""# -- Not there must be double quotes around abc
|
||||
///
|
||||
/// Examples of invalid expressions:
|
||||
/// "col =100" -- white space after col
|
||||
/// "col= 100" -- white space before 100
|
||||
/// "col>100" -- greater
|
||||
/// "col<=100" -- less and equal
|
||||
/// "col=abc" -- abc is not embedded in a double quote
|
||||
/// "col='abc'" -- abc is not embedded in a double quote. The single quote won't work
|
||||
fn parse_delete_expression(input: &str) -> IResult<&str, BinaryExpr> {
|
||||
// trim white spaces in front and at the end
|
||||
let input = input.trim();
|
||||
|
||||
if input.contains(' ') {
|
||||
return Err(nom::Err::Error(Error::WhiteSpaceInDeleteExpression {
|
||||
value: input.to_string(),
|
||||
}));
|
||||
}
|
||||
|
||||
// See if this is an equality comparison
|
||||
let expr = separated_pair(delete_predicate_field_key, tag("="), field_value).parse(input);
|
||||
if expr.is_err() {
|
||||
// See if this is an inequality comparison
|
||||
let expr =
|
||||
separated_pair(delete_predicate_field_key, tag("!="), field_value).parse(input)?;
|
||||
return Ok((
|
||||
input,
|
||||
BinaryExpr::new(expr.1 .0.as_str(), "!=", expr.1 .1.to_string().as_str()),
|
||||
));
|
||||
}
|
||||
|
||||
let expr = expr.unwrap();
|
||||
Ok((
|
||||
input,
|
||||
BinaryExpr::new(expr.1 .0.as_str(), "=", expr.1 .1.to_string().as_str()),
|
||||
))
|
||||
}
|
||||
|
||||
fn field_string_value(i: &str) -> IResult<&str, EscapedStr<'_>> {
|
||||
// https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
|
||||
// For string field values, backslash is only used to escape itself(\) or double
|
||||
|
@ -1177,7 +1068,6 @@ fn escape_and_write_value(
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use chrono::DateTime;
|
||||
use smallvec::smallvec;
|
||||
use test_helpers::approximately_equal;
|
||||
|
||||
|
@ -2330,135 +2220,4 @@ her"#,
|
|||
|
||||
assert_eq!(vals[0].tag_value("asdf"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_expression() {
|
||||
// Positive tests: valid expressions
|
||||
let input = "col=123";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("col", "=", "123");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = " col=123 ";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = " CoL=123 ";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("CoL", "=", "123");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = "col!=123";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("col", "!=", "123");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = "col2!=123";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("col2", "!=", "123");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = r#"col2!="abc""#;
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("col2", "!=", "abc");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = r#"col2="abc""#;
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("col2", "=", "abc");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = " CoL!=123 ";
|
||||
let result = parse_delete_expression(input).unwrap().1;
|
||||
let expected = BinaryExpr::new("CoL", "!=", "123");
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// Negative tests: Invalid expressions
|
||||
let input = "random";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "random phrase";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col2!=abc"; // abc not in double quote
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col = 123"; // white space before and after "=""
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = " col = 123 ";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = " CoL= 123 ";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = " col != 123 ";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col != 123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col>123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col>=123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col<123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col<=123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = "col between 123";
|
||||
let result = parse_delete_expression(input);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete_predicate() {
|
||||
// Positive tests: valid predicate
|
||||
let input = "col=123";
|
||||
let result = parse_delete_predicate(input).unwrap().1;
|
||||
let expected = vec![BinaryExpr::new("col", "=", "123")];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = r#"col=123 and col2!="abc""#;
|
||||
let result = parse_delete_predicate(input).unwrap().1;
|
||||
let expected = vec![
|
||||
BinaryExpr::new("col", "=", "123"),
|
||||
BinaryExpr::new("col2", "!=", "abc"),
|
||||
];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
let input = r#"temp=89.1 and city!="Boston" aNd day="Monday""#;
|
||||
let result = parse_delete_predicate(input).unwrap().1;
|
||||
let expected = vec![
|
||||
BinaryExpr::new("temp", "=", "89.1"),
|
||||
BinaryExpr::new("city", "!=", "Boston"),
|
||||
BinaryExpr::new("day", "=", "Monday"),
|
||||
];
|
||||
assert_eq!(result, expected);
|
||||
|
||||
// Negative tests: invalid predicate
|
||||
let input = r#"temp=89.1 or city!="Boston""#; // OR
|
||||
let result = parse_delete_predicate(input);
|
||||
assert!(result.is_err());
|
||||
|
||||
let input = r#"temp=89.1 AND city!="Boston" OR day="Monday"#; // OR
|
||||
let result = parse_delete_predicate(input);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -223,8 +223,7 @@ pub enum Error {
|
|||
DeleteExpression { expr: String },
|
||||
|
||||
#[snafu(display("delete start time, stop time, and predicate are not provided:"))]
|
||||
EmptyDeletePredicate { },
|
||||
|
||||
EmptyDeletePredicate {},
|
||||
// #[snafu(display("error while converting Parse Delete: {}", source))]
|
||||
// ParseDeleteConverting { source: String },
|
||||
}
|
||||
|
|
|
@ -189,7 +189,6 @@ enum Command {
|
|||
|
||||
/// Unload chunk from read buffer but keep it in object store.
|
||||
UnloadChunk(UnloadChunk),
|
||||
// NGA:todo - Debating: Should we add the Delete command that deletes data for table of this partition?
|
||||
}
|
||||
|
||||
pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||
|
|
|
@ -566,13 +566,12 @@ where
|
|||
parse_delete,
|
||||
} = request.into_inner();
|
||||
|
||||
let parse_delete = parse_delete
|
||||
.ok_or_else(|| {
|
||||
tonic::Status::unavailable(format!(
|
||||
"Delete Predicate has not yet been loaded for table ({}) of database ({})",
|
||||
table_name, db_name
|
||||
))
|
||||
})?;
|
||||
let parse_delete = parse_delete.ok_or_else(|| {
|
||||
tonic::Status::unavailable(format!(
|
||||
"Delete Predicate has not yet been loaded for table ({}) of database ({})",
|
||||
table_name, db_name
|
||||
))
|
||||
})?;
|
||||
|
||||
let provided_parse_delete: ProvidedParseDelete = parse_delete
|
||||
.try_into()
|
||||
|
@ -588,9 +587,12 @@ where
|
|||
// NGA todo: need to validate if the table and all of its columns in delete predicate are legit?
|
||||
|
||||
// Build the predicate
|
||||
let mut del_predicate = PredicateBuilder::new()
|
||||
let mut del_predicate = PredicateBuilder::new()
|
||||
.table(table_name.clone())
|
||||
.timestamp_range(provided_parse_delete.start_time, provided_parse_delete.stop_time)
|
||||
.timestamp_range(
|
||||
provided_parse_delete.start_time,
|
||||
provided_parse_delete.stop_time,
|
||||
)
|
||||
.build();
|
||||
// Add the predicate binary expressions
|
||||
for expr in provided_parse_delete.predicate {
|
||||
|
|
|
@ -1398,7 +1398,7 @@ async fn test_delete() {
|
|||
"cpu,region=west user=21.0 150",
|
||||
"disk,region=east bytes=99i 200",
|
||||
];
|
||||
|
||||
|
||||
let num_lines_written = write_client
|
||||
.write(&db_name, lp_lines.join("\n"))
|
||||
.await
|
||||
|
@ -1409,7 +1409,8 @@ async fn test_delete() {
|
|||
// Query cpu
|
||||
let mut query_results = flight_client
|
||||
.perform_query(db_name.clone(), "select * from cpu")
|
||||
.await.unwrap();
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = query_results.to_batches().await.unwrap();
|
||||
let expected = [
|
||||
"+--------+--------------------------------+------+",
|
||||
|
@ -1420,14 +1421,15 @@ async fn test_delete() {
|
|||
"+--------+--------------------------------+------+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
|
||||
// Delete some data
|
||||
// todo
|
||||
|
||||
// query to verify data deleted
|
||||
let mut query_results = flight_client
|
||||
.perform_query(db_name, "select * from cpu")
|
||||
.await.unwrap();
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = query_results.to_batches().await.unwrap();
|
||||
let expected = [
|
||||
"+--------+--------------------------------+------+",
|
||||
|
|
Loading…
Reference in New Issue