diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 0da0775518..dccd84e106 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -495,7 +495,10 @@ async fn update_in_memory_catalog( #[cfg(test)] mod tests { use super::*; - use crate::{db::{test_helpers::write_lp, load::load_or_create_preserved_catalog, catalog::Catalog}, utils::make_db}; + use crate::{ + db::{load::load_or_create_preserved_catalog, test_helpers::write_lp}, + utils::make_db, + }; use data_types::{ chunk_metadata::ChunkStorage, delete_predicate::{DeleteExpr, DeletePredicate}, @@ -717,15 +720,16 @@ mod tests { } #[tokio::test] - async fn test_compact_os_duplicates_and_hard_deletes() { + async fn test_compact_os_soft_and_hard_deletes() { test_helpers::maybe_start_logging(); // ----------------------------------------------- // Create 3 OS & 1 MUB chunks + let db = make_db().await.db; let partition_key = "1970-01-01T00"; write_lp(&db, "cpu,tag1=cupcakes bar=1 10"); - write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete - pred1 happens before compaction + write_lp(&db, "cpu,tag1=cookies bar=2 10"); // hard delete - pred1 happens before compaction // persist chunk 1 let _chunk_id_1 = db @@ -734,10 +738,10 @@ mod tests { .unwrap() .unwrap() .id(); - // + // persist chunk 2 - write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction - write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete - pred3 happens after compaction + write_lp(&db, "cpu,tag1=cookies bar=2 20"); // hard delete - pred2 happens twice before and during compaction + write_lp(&db, "cpu,tag1=cookies bar=3 30"); // soft delete - pred3 happens concurrently with compaction write_lp(&db, "cpu,tag1=cupcakes bar=2 20"); let chunk_id_2 = db @@ -746,30 +750,34 @@ mod tests { .unwrap() .unwrap() .id(); - // + // persist chunk 3 - write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction + write_lp(&db, "cpu,tag1=cookies bar=2 20"); // hard delete - pred2 happens twice before and during compaction let _chunk_id_3 = db .persist_partition("cpu", partition_key, true) .await .unwrap() .unwrap() .id(); - // + // drop RUB of chunk_id_2 + // So chunk_id_1 and chunk_id_3 have both RUB and OS, chunk_id_2 only has OS db.unload_read_buffer("cpu", partition_key, chunk_id_2) .unwrap(); - // + // Add a MUB write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40"); // ----------------------------------------------- // Verify results before OS compacting + + // Acquire read lock on partition let partition = db.lockable_partition("cpu", partition_key).unwrap(); let partition = partition.read(); let chunks = LockablePartition::chunks(&partition); assert_eq!(chunks.len(), 4); - // Ensure all RUBs are unloaded + + // Verify chunk types let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect(); assert_eq!(summary_chunks.len(), 4); summary_chunks.sort_by_key(|c| c.storage); @@ -787,7 +795,7 @@ mod tests { assert_eq!(summary_chunks[2].row_count, 1); // chunk_id_3 assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly); assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2 - + // Get min partition checkpoint which is the checkpoint of the first chunk let min_partition_checkpoint = { let chunk = chunks[0].clone().chunk; @@ -802,8 +810,12 @@ mod tests { iox_metadata.partition_checkpoint }; + // unlock partition + partition.into_data(); + // ----------------------------------------------- // Create 3 delete predicates that will delete all cookies in 3 different deletes + let pred1 = Arc::new(DeletePredicate { range: TimestampRange { start: 0, end: 11 }, exprs: vec![DeleteExpr::new( @@ -829,69 +841,87 @@ mod tests { )], }); - // ----------------------------------------------- // Apply deletes pred1 and pred2 before compaction + db.delete("cpu", pred1).unwrap(); - db.delete("cpu", pred2.clone()).unwrap(); + db.delete( + "cpu", + Arc::::clone(&pred2), + ) + .unwrap(); // ----------------------------------------------- // Compact 3 contiguous chunks 1, 2, 3 - let partition = partition.upgrade(); + + // acquire write lock to start compacting + let partition = db.lockable_partition("cpu", partition_key).unwrap(); + let partition = partition.write(); let chunk1 = chunks[0].write(); let chunk2 = chunks[1].write(); let chunk3 = chunks[2].write(); + // Start the OS compaction job but do not poll it yet - let (_tracker, fut) = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]) - .unwrap(); - // Now apply deletes pred2 (again) and pred3 - db.delete("cpu", pred2).unwrap(); - db.delete("cpu", pred3.clone()).unwrap(); - // Now continue running compaction job + let (_tracker, fut) = + compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]).unwrap(); + + // Apply deletes pred2 (again) and pred3 + db.delete("cpu", pred2).unwrap(); // duplicate delete and will not have any effects + db.delete( + "cpu", + Arc::::clone(&pred3), + ) + .unwrap(); + + // Continue running compaction job let compacted_chunk = tokio::spawn(fut).await.unwrap().unwrap().unwrap(); // ----------------------------------------------- // Verify results after compaction - // Verify MUB chunk still there but the other 3 OS chunks are now compacted into 1 OS chunks let partition = db.partition("cpu", partition_key).unwrap(); let partition = partition.read(); let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect(); - + + // ---------- + // Verify chunk types and number of rows in each summary_chunks.sort_by_key(|c| c.storage); assert_eq!(summary_chunks.len(), 2); - // MUB - assert_eq!(summary_chunks[0].storage, ChunkStorage::OpenMutableBuffer); + // MUB still here + assert_eq!(summary_chunks[0].storage, ChunkStorage::ClosedMutableBuffer); assert_eq!(summary_chunks[0].row_count, 1); // OS: the result of compacting all 3 persisted chunks assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly); - assert_eq!(summary_chunks[1].row_count, 2); - + // 2 rows + 1 soft deleted row due to pred3 happens during compaction + assert_eq!(summary_chunks[1].row_count, 3); + + // ---------- // Verify delete predicates in the in-memory catalog - let check_closure = |catalog: &Catalog| { - let catalog_chunks = catalog.chunks(); - assert_eq!(catalog_chunks.len(), 2); + let catalog_chunks = &db.catalog.chunks(); + // In-memory catalog should include 2 chunks: MUB and OS + assert_eq!(catalog_chunks.len(), 2); - let chunk = &catalog_chunks[0]; - let chunk = chunk.read(); - let del_preds1 = chunk.delete_predicates(); - let id1 = chunk.id(); + let chunk = &catalog_chunks[0]; + let chunk = chunk.read(); + let del_preds1 = chunk.delete_predicates(); + let id1 = chunk.id(); - let chunk = &catalog_chunks[1]; - let chunk = chunk.read(); - let del_preds2 = chunk.delete_predicates(); - //let id2 = chunk.id(); + let chunk = &catalog_chunks[1]; + let chunk = chunk.read(); + let del_preds2 = chunk.delete_predicates(); - let compacted_delete_preds = { - if id1 == summary_chunks[0].id { del_preds1 } - else { del_preds2 } - }; - - // Should only include pred3 - assert_eq!(compacted_delete_preds,vec![Arc::clone(&pred3)]); + // Get delete pred of the compacted chunk + let compacted_delete_preds = { + if id1 == summary_chunks[1].id { + del_preds1 + } else { + del_preds2 + } }; - check_closure(&db.catalog); + // should include pred3 as it happened concurrently with the compaction + assert_eq!(compacted_delete_preds, vec![Arc::clone(&pred3)]); + // ---------- // Verify delete predicates in preserved catalog let metric_registry = Arc::new(metric::Registry::new()); let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog( @@ -904,8 +934,21 @@ mod tests { ) .await .unwrap(); - check_closure(&catalog); + // preserved catalog only has knowledge of the compacted OS chunk + let catalog_chunks = catalog.chunks(); + assert_eq!(catalog_chunks.len(), 1); + let chunk = catalog_chunks[0].read(); + let preserved_compacted_delete_preds = chunk.delete_predicates(); + + // The brand new compacted OS chunk should not have any delete predicates. + // Note that, by designed, predicates of deletes that happen concurrently with compacting OS are only + // kept in its in-memory catalog chunk (and will be added in the preserved catalog in the background later). + // The brand new compacted OS chunk does not include any delete predicates. All previous soft-delete + // predicates were already hard deleted during its OS compaction. + assert!(preserved_compacted_delete_preds.is_empty()); + + // ---------- // verify partition checkpoint of the compacted chunk which should be the // one of the first OS chunk (earliest checkpoint) let chunk = compacted_chunk.unwrap(); @@ -917,7 +960,7 @@ mod tests { .read_iox_metadata() .unwrap(); let compacted_partition_checkpoint = iox_metadata.partition_checkpoint; - assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint); + assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint); } #[tokio::test]