refactor: remove unused fields from `DeletePredicate`
parent
d10d8e8a45
commit
10c1a72402
|
@ -3,15 +3,14 @@ package influxdata.iox.catalog.v1;
|
|||
|
||||
// Represents a parsed predicate for evaluation by the InfluxDB IOx query engine.
|
||||
message Predicate {
|
||||
// Optional table restriction. If present, restricts the results to only tables these tables.
|
||||
OptionalStringSet table_names = 1;
|
||||
// Was `table_names`.
|
||||
reserved 1;
|
||||
|
||||
// Optional field restriction. If present, restricts the results to only tables which have *at least one* of the
|
||||
// fields in field_columns.
|
||||
OptionalStringSet field_columns = 2;
|
||||
// Was `field_columns`.
|
||||
reserved 2;
|
||||
|
||||
// Optional partition key filter
|
||||
OptionalString partition_key = 3;
|
||||
// Was `partition_key`.
|
||||
reserved 3;
|
||||
|
||||
// Optional timestamp range: only rows within this range are included in results. Other rows are excluded.
|
||||
TimestampRange range = 4;
|
||||
|
|
|
@ -1714,12 +1714,12 @@ mod tests {
|
|||
}
|
||||
|
||||
// create two predicate
|
||||
let predicate_1 = create_delete_predicate(&chunk_addrs[0].table_name, 42);
|
||||
let predicate_1 = create_delete_predicate(42);
|
||||
let chunks_1 = vec![chunk_addrs[0].clone().into()];
|
||||
t.delete_predicate(&predicate_1, &chunks_1).unwrap();
|
||||
state.delete_predicate(predicate_1, chunks_1);
|
||||
|
||||
let predicate_2 = create_delete_predicate(&chunk_addrs[0].table_name, 1337);
|
||||
let predicate_2 = create_delete_predicate(1337);
|
||||
let chunks_2 = vec![chunk_addrs[0].clone().into(), chunk_addrs[1].clone().into()];
|
||||
t.delete_predicate(&predicate_2, &chunks_2).unwrap();
|
||||
state.delete_predicate(predicate_2, chunks_2);
|
||||
|
|
|
@ -525,13 +525,13 @@ where
|
|||
expected_files.insert(chunk_addr_2.chunk_id, (path, Arc::new(metadata)));
|
||||
|
||||
// first predicate used only a single chunk
|
||||
let predicate_1 = create_delete_predicate(&chunk_addr_1.table_name, 1);
|
||||
let predicate_1 = create_delete_predicate(1);
|
||||
let chunks_1 = vec![chunk_addr_1.clone().into()];
|
||||
state.delete_predicate(Arc::clone(&predicate_1), chunks_1.clone());
|
||||
expected_predicates.push((predicate_1, chunks_1));
|
||||
|
||||
// second predicate uses both chunks (but not the older chunks)
|
||||
let predicate_2 = create_delete_predicate(&chunk_addr_2.table_name, 2);
|
||||
let predicate_2 = create_delete_predicate(2);
|
||||
let chunks_2 = vec![chunk_addr_1.into(), chunk_addr_2.into()];
|
||||
state.delete_predicate(Arc::clone(&predicate_2), chunks_2.clone());
|
||||
expected_predicates.push((predicate_2, chunks_2));
|
||||
|
@ -579,7 +579,7 @@ where
|
|||
// Registering predicates for unknown chunks is just ignored because chunks might been in "persisting" intermediate
|
||||
// state while the predicate was reported.
|
||||
{
|
||||
let predicate = create_delete_predicate("some_table", 1);
|
||||
let predicate = create_delete_predicate(1);
|
||||
let chunks = vec![ChunkAddrWithoutDatabase {
|
||||
table_name: Arc::from("some_table"),
|
||||
partition_key: Arc::from("part"),
|
||||
|
@ -638,14 +638,8 @@ fn get_sorted_keys<'a>(
|
|||
}
|
||||
|
||||
/// Helper to create a simple delete predicate.
|
||||
pub fn create_delete_predicate(table_name: &str, value: i64) -> Arc<DeletePredicate> {
|
||||
pub fn create_delete_predicate(value: i64) -> Arc<DeletePredicate> {
|
||||
Arc::new(DeletePredicate {
|
||||
table_names: Some(
|
||||
IntoIterator::into_iter([table_name.to_string(), format!("not_{}", table_name)])
|
||||
.collect(),
|
||||
),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 11, end: 22 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"foo".to_string(),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use chrono::DateTime;
|
||||
use data_types::timestamp::TimestampRange;
|
||||
|
@ -70,17 +70,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// query engine.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct DeletePredicate {
|
||||
/// Optional table restriction. If present, restricts the results
|
||||
/// to only tables whose names are in `table_names`
|
||||
pub table_names: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional field restriction. If present, restricts the results to only
|
||||
/// tables which have *at least one* of the fields in field_columns.
|
||||
pub field_columns: Option<BTreeSet<String>>,
|
||||
|
||||
/// Optional partition key filter
|
||||
pub partition_key: Option<String>,
|
||||
|
||||
/// Only rows within this range are included in
|
||||
/// results. Other rows are excluded.
|
||||
pub range: TimestampRange,
|
||||
|
@ -96,9 +85,9 @@ pub struct DeletePredicate {
|
|||
impl From<DeletePredicate> for crate::predicate::Predicate {
|
||||
fn from(pred: DeletePredicate) -> Self {
|
||||
Self {
|
||||
table_names: pred.table_names,
|
||||
field_columns: pred.field_columns,
|
||||
partition_key: pred.partition_key,
|
||||
table_names: None,
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: Some(pred.range),
|
||||
exprs: pred.exprs.into_iter().map(|expr| expr.into()).collect(),
|
||||
}
|
||||
|
@ -329,9 +318,6 @@ impl ParseDeletePredicate {
|
|||
impl From<ParseDeletePredicate> for DeletePredicate {
|
||||
fn from(pred: ParseDeletePredicate) -> Self {
|
||||
Self {
|
||||
table_names: None,
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: pred.start_time,
|
||||
end: pred.stop_time,
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
//!
|
||||
//! [Ballista]: https://github.com/apache/arrow-datafusion/blob/22fcb3d7a68a56afbe12eab9e7d98f7b8de33703/ballista/rust/core/proto/ballista.proto
|
||||
//! [Protocol Buffers 3]: https://developers.google.com/protocol-buffers/docs/proto3
|
||||
use std::{collections::BTreeSet, convert::TryInto};
|
||||
use std::convert::TryInto;
|
||||
|
||||
use data_types::timestamp::TimestampRange;
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
@ -17,9 +17,6 @@ use crate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};
|
|||
/// Serialize IOx [`DeletePredicate`] to a protobuf object.
|
||||
pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate {
|
||||
proto::Predicate {
|
||||
table_names: serialize_optional_string_set(&predicate.table_names),
|
||||
field_columns: serialize_optional_string_set(&predicate.field_columns),
|
||||
partition_key: serialize_optional_string(&predicate.partition_key),
|
||||
range: Some(proto::TimestampRange {
|
||||
start: predicate.range.start,
|
||||
end: predicate.range.end,
|
||||
|
@ -32,19 +29,6 @@ pub fn serialize(predicate: &DeletePredicate) -> proto::Predicate {
|
|||
}
|
||||
}
|
||||
|
||||
fn serialize_optional_string_set(
|
||||
set: &Option<BTreeSet<String>>,
|
||||
) -> Option<proto::OptionalStringSet> {
|
||||
set.as_ref().map(|set| proto::OptionalStringSet {
|
||||
values: set.iter().cloned().collect(),
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_optional_string(s: &Option<String>) -> Option<proto::OptionalString> {
|
||||
s.as_ref()
|
||||
.map(|s| proto::OptionalString { value: s.clone() })
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum DeserializeError {
|
||||
#[snafu(display("timestamp range is missing"))]
|
||||
|
@ -61,9 +45,6 @@ pub fn deserialize(
|
|||
proto_predicate: &proto::Predicate,
|
||||
) -> Result<DeletePredicate, DeserializeError> {
|
||||
let predicate = DeletePredicate {
|
||||
table_names: deserialize_optional_string_set(&proto_predicate.table_names),
|
||||
field_columns: deserialize_optional_string_set(&proto_predicate.field_columns),
|
||||
partition_key: deserialize_optional_string(&proto_predicate.partition_key),
|
||||
range: proto_predicate
|
||||
.range
|
||||
.as_ref()
|
||||
|
@ -84,16 +65,6 @@ pub fn deserialize(
|
|||
Ok(predicate)
|
||||
}
|
||||
|
||||
fn deserialize_optional_string_set(
|
||||
set: &Option<proto::OptionalStringSet>,
|
||||
) -> Option<BTreeSet<String>> {
|
||||
set.as_ref().map(|set| set.values.iter().cloned().collect())
|
||||
}
|
||||
|
||||
fn deserialize_optional_string(s: &Option<proto::OptionalString>) -> Option<String> {
|
||||
s.as_ref().map(|s| s.value.clone())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::delete_predicate::ParseDeletePredicate;
|
||||
|
@ -102,14 +73,13 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_roundtrip() {
|
||||
let table_name = "my_table";
|
||||
let predicate = delete_predicate(table_name);
|
||||
let predicate = delete_predicate();
|
||||
let proto = serialize(&predicate);
|
||||
let recovered = deserialize(&proto).unwrap();
|
||||
assert_eq!(predicate, recovered);
|
||||
}
|
||||
|
||||
fn delete_predicate(table_name: &str) -> DeletePredicate {
|
||||
fn delete_predicate() -> DeletePredicate {
|
||||
let start_time = "11";
|
||||
let stop_time = "22";
|
||||
let predicate = r#"city=Boston and cost!=100 and temp=87.5 and good=true"#;
|
||||
|
@ -117,8 +87,6 @@ mod tests {
|
|||
let parse_delete_pred =
|
||||
ParseDeletePredicate::try_new(start_time, stop_time, predicate).unwrap();
|
||||
|
||||
let mut pred: DeletePredicate = parse_delete_pred.into();
|
||||
pred.table_names = Some(IntoIterator::into_iter([table_name.to_string()]).collect());
|
||||
pred
|
||||
parse_delete_pred.into()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,9 +34,6 @@ impl DbSetup for OneDeleteSimpleExprOneChunkDeleteAll {
|
|||
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 10, end: 20 },
|
||||
exprs: vec![],
|
||||
};
|
||||
|
@ -61,9 +58,6 @@ impl DbSetup for OneDeleteSimpleExprOneChunk {
|
|||
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 15 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
@ -95,9 +89,6 @@ impl DbSetup for OneDeleteMultiExprsOneChunk {
|
|||
];
|
||||
// delete predicate
|
||||
let pred = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 30 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -142,9 +133,6 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
|
|||
// delete predicate
|
||||
// pred1: delete from cpu where 0 <= time <= 32 and bar = 1 and foo = 'me'
|
||||
let pred1 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 32 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -162,9 +150,6 @@ impl DbSetup for TwoDeletesMultiExprsOneChunk {
|
|||
|
||||
// pred2: delete from cpu where 10 <= time <= 40 and bar != 1
|
||||
let pred2 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 10, end: 40 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
@ -205,9 +190,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 1
|
||||
let pred1 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 30 },
|
||||
exprs: vec![
|
||||
DeleteExpr::new(
|
||||
|
@ -232,9 +214,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 1 & chunk 2
|
||||
let pred2 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 20, end: 45 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"foo".to_string(),
|
||||
|
@ -252,9 +231,6 @@ impl DbSetup for ThreeDeleteThreeChunks {
|
|||
];
|
||||
// delete predicate on chunk 3
|
||||
let pred3 = DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 75, end: 95 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"bar".to_string(),
|
||||
|
|
|
@ -3682,9 +3682,6 @@ mod tests {
|
|||
|
||||
// ==================== do: delete ====================
|
||||
let pred = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
|
@ -1084,9 +1084,6 @@ mod tests {
|
|||
|
||||
// Build delete predicate and expected output
|
||||
let del_pred1 = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 100 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"city".to_string(),
|
||||
|
@ -1109,9 +1106,6 @@ mod tests {
|
|||
// let add more delete predicate = simulate second delete
|
||||
// Build delete predicate and expected output
|
||||
let del_pred2 = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["test".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 20, end: 50 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"cost".to_string(),
|
||||
|
|
|
@ -236,9 +236,6 @@ mod tests {
|
|||
|
||||
// Cannot simply use empty predicate (#2687)
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
|
@ -303,9 +303,6 @@ mod tests {
|
|||
|
||||
// Delete first row
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange { start: 0, end: 20 },
|
||||
exprs: vec![],
|
||||
});
|
||||
|
@ -369,9 +366,6 @@ mod tests {
|
|||
|
||||
// Delete everything
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
table_names: Some(IntoIterator::into_iter(["cpu".to_string()]).collect()),
|
||||
field_columns: None,
|
||||
partition_key: None,
|
||||
range: TimestampRange {
|
||||
start: 0,
|
||||
end: 1_000,
|
||||
|
|
Loading…
Reference in New Issue