fix: fix the cases of multi[le expressions in delete predicate
parent
7175488133
commit
61e1eac135
|
@ -175,11 +175,24 @@ impl Predicate {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add a list of not(a delete expr)
|
||||
/// Add a list of disjunctive negated expressions.
|
||||
/// Example: there are two deletes as follows
|
||||
/// . Delete_1: WHERE city != "Boston" AND temp = 70
|
||||
/// . Delete 2: WHERE state = "NY" AND route != "I90"
|
||||
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
|
||||
/// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means
|
||||
/// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")]
|
||||
pub fn add_delete_exprs(&mut self, delete_predicates: &[Self]) {
|
||||
for pred in delete_predicates {
|
||||
let mut expr: Option<Expr> = None;
|
||||
for exp in &pred.exprs {
|
||||
self.exprs.push(exp.clone().not());
|
||||
match expr {
|
||||
None => expr = Some(exp.clone().not()),
|
||||
Some(e) => expr = Some(e.or(exp.clone().not())),
|
||||
}
|
||||
}
|
||||
if let Some(e) = expr {
|
||||
self.exprs.push(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
136
server/src/db.rs
136
server/src/db.rs
|
@ -3835,7 +3835,7 @@ mod tests {
|
|||
assert_batches_sorted_eq!(expected, &batches); // this fails when the todo is done
|
||||
|
||||
// ---
|
||||
// Since chunk was frozen when the delete was issued, nothing will be frozen futher
|
||||
// Since chunk was frozen when the delete was issued, nothing will be frozen further
|
||||
let mb_open_chunk = db.rollover_partition("cpu", partition_key).await.unwrap();
|
||||
assert!(mb_open_chunk.is_none());
|
||||
// Verify there is MUB but no RUB no OS
|
||||
|
@ -3908,4 +3908,138 @@ mod tests {
|
|||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
assert_batches_sorted_eq!(expected, &batches);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_on_different_chunk_types_multi_exprs() {
|
||||
::test_helpers::maybe_start_logging();
|
||||
|
||||
let db = make_db().await.db;
|
||||
// load 4 lines which belong to partition starting with 0 nanosecond or "1970-01-01T00"
|
||||
let lines = vec![
|
||||
"cpu,foo=me bar=1 10",
|
||||
"cpu,foo=you bar=2 20",
|
||||
"cpu,foo=me bar=1 30",
|
||||
"cpu,foo=me bar=1 40",
|
||||
];
|
||||
write_lp(db.as_ref(), &lines.join("\n")).await;
|
||||
let partition_key = "1970-01-01T00";
|
||||
assert_eq!(vec![partition_key], db.partition_keys().unwrap());
|
||||
|
||||
// ----
|
||||
// There is one open mub chunk
|
||||
// Verify there is MUB but no RUB no OS
|
||||
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_buffer_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_parquet_file_chunk_ids(&db, partition_key).is_empty());
|
||||
// Let query it
|
||||
let expected = vec![
|
||||
"+-----+-----+--------------------------------+",
|
||||
"| bar | foo | time |",
|
||||
"+-----+-----+--------------------------------+",
|
||||
"| 1 | me | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 1 | me | 1970-01-01T00:00:00.000000030Z |",
|
||||
"| 1 | me | 1970-01-01T00:00:00.000000040Z |",
|
||||
"| 2 | you | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----+-----+--------------------------------+",
|
||||
];
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
assert_batches_sorted_eq!(expected, &batches);
|
||||
|
||||
// ---
|
||||
// Delete a row with timestamp 10
|
||||
let i: f64 = 1.0;
|
||||
let expr1 = col("bar").eq(lit(i));
|
||||
let expr2 = col("foo").eq(lit("me"));
|
||||
let pred = PredicateBuilder::new()
|
||||
.table("cpu")
|
||||
.timestamp_range(0, 32)
|
||||
.add_expr(expr1)
|
||||
.add_expr(expr2)
|
||||
.build();
|
||||
db.delete("cpu", &pred).await.unwrap();
|
||||
// When the above delete is issued, the open mub chunk is frozen with the delete predicate added
|
||||
// Verify there is MUB but no RUB no OS
|
||||
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_buffer_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_parquet_file_chunk_ids(&db, partition_key).is_empty());
|
||||
// Let query again
|
||||
// NGA todo: since MUB does not support predicate push down, the filtering needs to be done at scan step
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
assert_batches_sorted_eq!(expected, &batches); // this fails when the todo is done
|
||||
|
||||
// ---
|
||||
// Since chunk was frozen when the delete was issued, nothing will be frozen further
|
||||
let mb_open_chunk = db.rollover_partition("cpu", partition_key).await.unwrap();
|
||||
assert!(mb_open_chunk.is_none());
|
||||
// Verify there is MUB but no RUB no OS
|
||||
assert!(!mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_buffer_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_parquet_file_chunk_ids(&db, partition_key).is_empty());
|
||||
|
||||
// ---
|
||||
// let move MUB to RUB. The delete predicate will be included in RUB
|
||||
let mb_chunk_id = 0;
|
||||
let _mb_chunk = db.chunk("cpu", partition_key, mb_chunk_id).unwrap();
|
||||
let rb_chunk = db
|
||||
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk_id)
|
||||
.await
|
||||
.unwrap();
|
||||
// Verify there is RUB but no MUB no OS
|
||||
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert_eq!(
|
||||
read_buffer_chunk_ids(&db, partition_key),
|
||||
vec![rb_chunk.id()]
|
||||
);
|
||||
assert!(read_parquet_file_chunk_ids(&db, partition_key).is_empty());
|
||||
// Let query again and since RUB supports delete predicate push down, we should see only 2 rows
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
let expected = vec![
|
||||
"+-----+-----+--------------------------------+",
|
||||
"| bar | foo | time |",
|
||||
"+-----+-----+--------------------------------+",
|
||||
"| 1 | me | 1970-01-01T00:00:00.000000040Z |",
|
||||
"| 2 | you | 1970-01-01T00:00:00.000000020Z |",
|
||||
"+-----+-----+--------------------------------+",
|
||||
];
|
||||
assert_batches_sorted_eq!(expected, &batches);
|
||||
|
||||
// ---
|
||||
// Write the RUB chunk to Object Store but keep it in RUB
|
||||
let pq_chunk = db
|
||||
.persist_partition(
|
||||
"cpu",
|
||||
partition_key,
|
||||
Instant::now() + Duration::from_secs(1),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let pq_chunk_id = pq_chunk.id();
|
||||
// we should have chunks in both RUB and OS
|
||||
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert_eq!(read_buffer_chunk_ids(&db, partition_key), vec![pq_chunk_id]);
|
||||
assert_eq!(
|
||||
read_parquet_file_chunk_ids(&db, partition_key),
|
||||
vec![pq_chunk_id]
|
||||
);
|
||||
// Let query again from RUB and should see only 2 rows
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
assert_batches_sorted_eq!(expected, &batches);
|
||||
|
||||
// ---
|
||||
// Unload RB chunk but keep it in OS
|
||||
let pq_chunk = db
|
||||
.unload_read_buffer("cpu", partition_key, pq_chunk_id)
|
||||
.unwrap();
|
||||
|
||||
// we should only have chunk in os
|
||||
assert!(mutable_chunk_ids(&db, partition_key).is_empty());
|
||||
assert!(read_buffer_chunk_ids(&db, partition_key).is_empty());
|
||||
assert_eq!(
|
||||
read_parquet_file_chunk_ids(&db, partition_key),
|
||||
vec![pq_chunk.id()]
|
||||
);
|
||||
// Let query again but this time from OS and should also see one row (OS prunes partition )
|
||||
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
|
||||
assert_batches_sorted_eq!(expected, &batches);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -365,12 +365,11 @@ impl QueryChunk for DbChunk {
|
|||
}
|
||||
State::ReadBuffer { chunk, .. } => {
|
||||
// Only apply pushdownable predicates
|
||||
let rb_predicate = match to_read_buffer_predicate(&pred_with_deleted_ranges)
|
||||
.context(PredicateConversion)
|
||||
{
|
||||
Ok(predicate) => predicate,
|
||||
Err(_) => read_buffer::Predicate::default(),
|
||||
};
|
||||
let rb_predicate =
|
||||
match to_read_buffer_predicate(predicate).context(PredicateConversion) {
|
||||
Ok(predicate) => predicate,
|
||||
Err(_) => read_buffer::Predicate::default(),
|
||||
};
|
||||
debug!(?rb_predicate, "Predicate pushed down to RUB");
|
||||
|
||||
// combine all delete expressions to RUB's negated ones
|
||||
|
|
Loading…
Reference in New Issue