From 4e86d0ef30b0274b129e55dabc332b1107908183 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 8 Dec 2021 17:50:51 -0500 Subject: [PATCH] test: propogate delete tests for compact OS chunks --- .../src/db/lifecycle/compact_object_store.rs | 129 ++++++++++++++---- 1 file changed, 101 insertions(+), 28 deletions(-) diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 41c7ca489d..0da0775518 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -495,7 +495,7 @@ async fn update_in_memory_catalog( #[cfg(test)] mod tests { use super::*; - use crate::{db::test_helpers::write_lp, utils::make_db}; + use crate::{db::{test_helpers::write_lp, load::load_or_create_preserved_catalog, catalog::Catalog}, utils::make_db}; use data_types::{ chunk_metadata::ChunkStorage, delete_predicate::{DeleteExpr, DeletePredicate}, @@ -720,10 +720,12 @@ mod tests { async fn test_compact_os_duplicates_and_hard_deletes() { test_helpers::maybe_start_logging(); + // ----------------------------------------------- + // Create 3 OS & 1 MUB chunks let db = make_db().await.db; let partition_key = "1970-01-01T00"; write_lp(&db, "cpu,tag1=cupcakes bar=1 10"); - write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete + write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete - pred1 happens before compaction // persist chunk 1 let _chunk_id_1 = db @@ -734,8 +736,8 @@ mod tests { .id(); // // persist chunk 2 - write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete + write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction + write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete - pred3 happens after compaction write_lp(&db, "cpu,tag1=cupcakes bar=2 20"); let chunk_id_2 = db @@ -746,7 +748,7 @@ mod tests { .id(); // // persist chunk 3 - write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete + write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction let _chunk_id_3 = db .persist_partition("cpu", partition_key, true) .await @@ -758,26 +760,16 @@ mod tests { db.unload_read_buffer("cpu", partition_key, chunk_id_2) .unwrap(); // - // Delete all cookies - let predicate = Arc::new(DeletePredicate { - range: TimestampRange { start: 0, end: 30 }, - exprs: vec![DeleteExpr::new( - "tag1".to_string(), - data_types::delete_predicate::Op::Eq, - data_types::delete_predicate::Scalar::String("cookies".to_string()), - )], - }); - db.delete("cpu", predicate).unwrap(); - // // Add a MUB write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40"); + // ----------------------------------------------- // Verify results before OS compacting let partition = db.lockable_partition("cpu", partition_key).unwrap(); let partition = partition.read(); let chunks = LockablePartition::chunks(&partition); assert_eq!(chunks.len(), 4); - // ensure all RUBs are unloaded + // Ensure all RUBs are unloaded let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect(); assert_eq!(summary_chunks.len(), 4); summary_chunks.sort_by_key(|c| c.storage); @@ -795,7 +787,7 @@ mod tests { assert_eq!(summary_chunks[2].row_count, 1); // chunk_id_3 assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly); assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2 - + // Get min partition checkpoint which is the checkpoint of the first chunk let min_partition_checkpoint = { let chunk = chunks[0].clone().chunk; @@ -810,21 +802,62 @@ mod tests { iox_metadata.partition_checkpoint }; - // compact 3 contiguous chunks 1, 2, 3 + // ----------------------------------------------- + // Create 3 delete predicates that will delete all cookies in 3 different deletes + let pred1 = Arc::new(DeletePredicate { + range: TimestampRange { start: 0, end: 11 }, + exprs: vec![DeleteExpr::new( + "tag1".to_string(), + data_types::delete_predicate::Op::Eq, + data_types::delete_predicate::Scalar::String("cookies".to_string()), + )], + }); + let pred2 = Arc::new(DeletePredicate { + range: TimestampRange { start: 12, end: 21 }, + exprs: vec![DeleteExpr::new( + "tag1".to_string(), + data_types::delete_predicate::Op::Eq, + data_types::delete_predicate::Scalar::String("cookies".to_string()), + )], + }); + let pred3 = Arc::new(DeletePredicate { + range: TimestampRange { start: 22, end: 31 }, + exprs: vec![DeleteExpr::new( + "tag1".to_string(), + data_types::delete_predicate::Op::Eq, + data_types::delete_predicate::Scalar::String("cookies".to_string()), + )], + }); + + + // ----------------------------------------------- + // Apply deletes pred1 and pred2 before compaction + db.delete("cpu", pred1).unwrap(); + db.delete("cpu", pred2.clone()).unwrap(); + + // ----------------------------------------------- + // Compact 3 contiguous chunks 1, 2, 3 let partition = partition.upgrade(); let chunk1 = chunks[0].write(); let chunk2 = chunks[1].write(); let chunk3 = chunks[2].write(); - let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]) - .unwrap() - .1 - .await - .unwrap() + // Start the OS compaction job but do not poll it yet + let (_tracker, fut) = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]) .unwrap(); + // Now apply deletes pred2 (again) and pred3 + db.delete("cpu", pred2).unwrap(); + db.delete("cpu", pred3.clone()).unwrap(); + // Now continue running compaction job + let compacted_chunk = tokio::spawn(fut).await.unwrap().unwrap().unwrap(); - // verify results + // ----------------------------------------------- + // Verify results after compaction + + // Verify MUB chunk still there but the other 3 OS chunks are now compacted into 1 OS chunks let partition = db.partition("cpu", partition_key).unwrap(); - let mut summary_chunks: Vec<_> = partition.read().chunk_summaries().collect(); + let partition = partition.read(); + let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect(); + summary_chunks.sort_by_key(|c| c.storage); assert_eq!(summary_chunks.len(), 2); // MUB @@ -833,8 +866,48 @@ mod tests { // OS: the result of compacting all 3 persisted chunks assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly); assert_eq!(summary_chunks[1].row_count, 2); + + // Verify delete predicates in the in-memory catalog + let check_closure = |catalog: &Catalog| { + let catalog_chunks = catalog.chunks(); + assert_eq!(catalog_chunks.len(), 2); - // verify partition checkpoint of the compacted chunk + let chunk = &catalog_chunks[0]; + let chunk = chunk.read(); + let del_preds1 = chunk.delete_predicates(); + let id1 = chunk.id(); + + let chunk = &catalog_chunks[1]; + let chunk = chunk.read(); + let del_preds2 = chunk.delete_predicates(); + //let id2 = chunk.id(); + + let compacted_delete_preds = { + if id1 == summary_chunks[0].id { del_preds1 } + else { del_preds2 } + }; + + // Should only include pred3 + assert_eq!(compacted_delete_preds,vec![Arc::clone(&pred3)]); + }; + check_closure(&db.catalog); + + // Verify delete predicates in preserved catalog + let metric_registry = Arc::new(metric::Registry::new()); + let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog( + &*db.name(), + Arc::clone(&db.iox_object_store), + metric_registry, + Arc::clone(&db.time_provider), + false, + false, + ) + .await + .unwrap(); + check_closure(&catalog); + + // verify partition checkpoint of the compacted chunk which should be the + // one of the first OS chunk (earliest checkpoint) let chunk = compacted_chunk.unwrap(); let parquet_chunk = chunk.parquet_chunk().unwrap(); let iox_parquet_metadata = parquet_chunk.parquet_metadata(); @@ -844,7 +917,7 @@ mod tests { .read_iox_metadata() .unwrap(); let compacted_partition_checkpoint = iox_metadata.partition_checkpoint; - assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint); + assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint); } #[tokio::test]