test: propogate delete tests for compact OS chunks
parent
b0209137e6
commit
4e86d0ef30
|
@ -495,7 +495,7 @@ async fn update_in_memory_catalog(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{db::test_helpers::write_lp, utils::make_db};
|
||||
use crate::{db::{test_helpers::write_lp, load::load_or_create_preserved_catalog, catalog::Catalog}, utils::make_db};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkStorage,
|
||||
delete_predicate::{DeleteExpr, DeletePredicate},
|
||||
|
@ -720,10 +720,12 @@ mod tests {
|
|||
async fn test_compact_os_duplicates_and_hard_deletes() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// -----------------------------------------------
|
||||
// Create 3 OS & 1 MUB chunks
|
||||
let db = make_db().await.db;
|
||||
let partition_key = "1970-01-01T00";
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete - pred1 happens before compaction
|
||||
|
||||
// persist chunk 1
|
||||
let _chunk_id_1 = db
|
||||
|
@ -734,8 +736,8 @@ mod tests {
|
|||
.id();
|
||||
//
|
||||
// persist chunk 2
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete
|
||||
write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction
|
||||
write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete - pred3 happens after compaction
|
||||
write_lp(&db, "cpu,tag1=cupcakes bar=2 20");
|
||||
|
||||
let chunk_id_2 = db
|
||||
|
@ -746,7 +748,7 @@ mod tests {
|
|||
.id();
|
||||
//
|
||||
// persist chunk 3
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete
|
||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction
|
||||
let _chunk_id_3 = db
|
||||
.persist_partition("cpu", partition_key, true)
|
||||
.await
|
||||
|
@ -758,26 +760,16 @@ mod tests {
|
|||
db.unload_read_buffer("cpu", partition_key, chunk_id_2)
|
||||
.unwrap();
|
||||
//
|
||||
// Delete all cookies
|
||||
let predicate = Arc::new(DeletePredicate {
|
||||
range: TimestampRange { start: 0, end: 30 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"tag1".to_string(),
|
||||
data_types::delete_predicate::Op::Eq,
|
||||
data_types::delete_predicate::Scalar::String("cookies".to_string()),
|
||||
)],
|
||||
});
|
||||
db.delete("cpu", predicate).unwrap();
|
||||
//
|
||||
// Add a MUB
|
||||
write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40");
|
||||
|
||||
// -----------------------------------------------
|
||||
// Verify results before OS compacting
|
||||
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||
let partition = partition.read();
|
||||
let chunks = LockablePartition::chunks(&partition);
|
||||
assert_eq!(chunks.len(), 4);
|
||||
// ensure all RUBs are unloaded
|
||||
// Ensure all RUBs are unloaded
|
||||
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
||||
assert_eq!(summary_chunks.len(), 4);
|
||||
summary_chunks.sort_by_key(|c| c.storage);
|
||||
|
@ -810,21 +802,62 @@ mod tests {
|
|||
iox_metadata.partition_checkpoint
|
||||
};
|
||||
|
||||
// compact 3 contiguous chunks 1, 2, 3
|
||||
// -----------------------------------------------
|
||||
// Create 3 delete predicates that will delete all cookies in 3 different deletes
|
||||
let pred1 = Arc::new(DeletePredicate {
|
||||
range: TimestampRange { start: 0, end: 11 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"tag1".to_string(),
|
||||
data_types::delete_predicate::Op::Eq,
|
||||
data_types::delete_predicate::Scalar::String("cookies".to_string()),
|
||||
)],
|
||||
});
|
||||
let pred2 = Arc::new(DeletePredicate {
|
||||
range: TimestampRange { start: 12, end: 21 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"tag1".to_string(),
|
||||
data_types::delete_predicate::Op::Eq,
|
||||
data_types::delete_predicate::Scalar::String("cookies".to_string()),
|
||||
)],
|
||||
});
|
||||
let pred3 = Arc::new(DeletePredicate {
|
||||
range: TimestampRange { start: 22, end: 31 },
|
||||
exprs: vec![DeleteExpr::new(
|
||||
"tag1".to_string(),
|
||||
data_types::delete_predicate::Op::Eq,
|
||||
data_types::delete_predicate::Scalar::String("cookies".to_string()),
|
||||
)],
|
||||
});
|
||||
|
||||
|
||||
// -----------------------------------------------
|
||||
// Apply deletes pred1 and pred2 before compaction
|
||||
db.delete("cpu", pred1).unwrap();
|
||||
db.delete("cpu", pred2.clone()).unwrap();
|
||||
|
||||
// -----------------------------------------------
|
||||
// Compact 3 contiguous chunks 1, 2, 3
|
||||
let partition = partition.upgrade();
|
||||
let chunk1 = chunks[0].write();
|
||||
let chunk2 = chunks[1].write();
|
||||
let chunk3 = chunks[2].write();
|
||||
let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3])
|
||||
.unwrap()
|
||||
.1
|
||||
.await
|
||||
.unwrap()
|
||||
// Start the OS compaction job but do not poll it yet
|
||||
let (_tracker, fut) = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3])
|
||||
.unwrap();
|
||||
// Now apply deletes pred2 (again) and pred3
|
||||
db.delete("cpu", pred2).unwrap();
|
||||
db.delete("cpu", pred3.clone()).unwrap();
|
||||
// Now continue running compaction job
|
||||
let compacted_chunk = tokio::spawn(fut).await.unwrap().unwrap().unwrap();
|
||||
|
||||
// verify results
|
||||
// -----------------------------------------------
|
||||
// Verify results after compaction
|
||||
|
||||
// Verify MUB chunk still there but the other 3 OS chunks are now compacted into 1 OS chunks
|
||||
let partition = db.partition("cpu", partition_key).unwrap();
|
||||
let mut summary_chunks: Vec<_> = partition.read().chunk_summaries().collect();
|
||||
let partition = partition.read();
|
||||
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
||||
|
||||
summary_chunks.sort_by_key(|c| c.storage);
|
||||
assert_eq!(summary_chunks.len(), 2);
|
||||
// MUB
|
||||
|
@ -834,7 +867,47 @@ mod tests {
|
|||
assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly);
|
||||
assert_eq!(summary_chunks[1].row_count, 2);
|
||||
|
||||
// verify partition checkpoint of the compacted chunk
|
||||
// Verify delete predicates in the in-memory catalog
|
||||
let check_closure = |catalog: &Catalog| {
|
||||
let catalog_chunks = catalog.chunks();
|
||||
assert_eq!(catalog_chunks.len(), 2);
|
||||
|
||||
let chunk = &catalog_chunks[0];
|
||||
let chunk = chunk.read();
|
||||
let del_preds1 = chunk.delete_predicates();
|
||||
let id1 = chunk.id();
|
||||
|
||||
let chunk = &catalog_chunks[1];
|
||||
let chunk = chunk.read();
|
||||
let del_preds2 = chunk.delete_predicates();
|
||||
//let id2 = chunk.id();
|
||||
|
||||
let compacted_delete_preds = {
|
||||
if id1 == summary_chunks[0].id { del_preds1 }
|
||||
else { del_preds2 }
|
||||
};
|
||||
|
||||
// Should only include pred3
|
||||
assert_eq!(compacted_delete_preds,vec![Arc::clone(&pred3)]);
|
||||
};
|
||||
check_closure(&db.catalog);
|
||||
|
||||
// Verify delete predicates in preserved catalog
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog(
|
||||
&*db.name(),
|
||||
Arc::clone(&db.iox_object_store),
|
||||
metric_registry,
|
||||
Arc::clone(&db.time_provider),
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
check_closure(&catalog);
|
||||
|
||||
// verify partition checkpoint of the compacted chunk which should be the
|
||||
// one of the first OS chunk (earliest checkpoint)
|
||||
let chunk = compacted_chunk.unwrap();
|
||||
let parquet_chunk = chunk.parquet_chunk().unwrap();
|
||||
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
|
||||
|
|
Loading…
Reference in New Issue