test: test concurrent deletes and OS compaction
parent
4e86d0ef30
commit
0d6fefdd09
|
@ -495,7 +495,10 @@ async fn update_in_memory_catalog(
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{db::{test_helpers::write_lp, load::load_or_create_preserved_catalog, catalog::Catalog}, utils::make_db};
|
use crate::{
|
||||||
|
db::{load::load_or_create_preserved_catalog, test_helpers::write_lp},
|
||||||
|
utils::make_db,
|
||||||
|
};
|
||||||
use data_types::{
|
use data_types::{
|
||||||
chunk_metadata::ChunkStorage,
|
chunk_metadata::ChunkStorage,
|
||||||
delete_predicate::{DeleteExpr, DeletePredicate},
|
delete_predicate::{DeleteExpr, DeletePredicate},
|
||||||
|
@ -717,15 +720,16 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_compact_os_duplicates_and_hard_deletes() {
|
async fn test_compact_os_soft_and_hard_deletes() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Create 3 OS & 1 MUB chunks
|
// Create 3 OS & 1 MUB chunks
|
||||||
|
|
||||||
let db = make_db().await.db;
|
let db = make_db().await.db;
|
||||||
let partition_key = "1970-01-01T00";
|
let partition_key = "1970-01-01T00";
|
||||||
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
write_lp(&db, "cpu,tag1=cupcakes bar=1 10");
|
||||||
write_lp(&db, "cpu,tag1=cookies bar=2 10"); // delete - pred1 happens before compaction
|
write_lp(&db, "cpu,tag1=cookies bar=2 10"); // hard delete - pred1 happens before compaction
|
||||||
|
|
||||||
// persist chunk 1
|
// persist chunk 1
|
||||||
let _chunk_id_1 = db
|
let _chunk_id_1 = db
|
||||||
|
@ -734,10 +738,10 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.id();
|
.id();
|
||||||
//
|
|
||||||
// persist chunk 2
|
// persist chunk 2
|
||||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction
|
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // hard delete - pred2 happens twice before and during compaction
|
||||||
write_lp(&db, "cpu,tag1=cookies bar=3 30"); // duplicate & delete - pred3 happens after compaction
|
write_lp(&db, "cpu,tag1=cookies bar=3 30"); // soft delete - pred3 happens concurrently with compaction
|
||||||
write_lp(&db, "cpu,tag1=cupcakes bar=2 20");
|
write_lp(&db, "cpu,tag1=cupcakes bar=2 20");
|
||||||
|
|
||||||
let chunk_id_2 = db
|
let chunk_id_2 = db
|
||||||
|
@ -746,30 +750,34 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.id();
|
.id();
|
||||||
//
|
|
||||||
// persist chunk 3
|
// persist chunk 3
|
||||||
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // delete - pred2 happens twice before and during compaction
|
write_lp(&db, "cpu,tag1=cookies bar=2 20"); // hard delete - pred2 happens twice before and during compaction
|
||||||
let _chunk_id_3 = db
|
let _chunk_id_3 = db
|
||||||
.persist_partition("cpu", partition_key, true)
|
.persist_partition("cpu", partition_key, true)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.id();
|
.id();
|
||||||
//
|
|
||||||
// drop RUB of chunk_id_2
|
// drop RUB of chunk_id_2
|
||||||
|
// So chunk_id_1 and chunk_id_3 have both RUB and OS, chunk_id_2 only has OS
|
||||||
db.unload_read_buffer("cpu", partition_key, chunk_id_2)
|
db.unload_read_buffer("cpu", partition_key, chunk_id_2)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
//
|
|
||||||
// Add a MUB
|
// Add a MUB
|
||||||
write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40");
|
write_lp(db.as_ref(), "cpu,tag1=brownies,tag2=a bar=2 40");
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Verify results before OS compacting
|
// Verify results before OS compacting
|
||||||
|
|
||||||
|
// Acquire read lock on partition
|
||||||
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||||
let partition = partition.read();
|
let partition = partition.read();
|
||||||
let chunks = LockablePartition::chunks(&partition);
|
let chunks = LockablePartition::chunks(&partition);
|
||||||
assert_eq!(chunks.len(), 4);
|
assert_eq!(chunks.len(), 4);
|
||||||
// Ensure all RUBs are unloaded
|
|
||||||
|
// Verify chunk types
|
||||||
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
||||||
assert_eq!(summary_chunks.len(), 4);
|
assert_eq!(summary_chunks.len(), 4);
|
||||||
summary_chunks.sort_by_key(|c| c.storage);
|
summary_chunks.sort_by_key(|c| c.storage);
|
||||||
|
@ -787,7 +795,7 @@ mod tests {
|
||||||
assert_eq!(summary_chunks[2].row_count, 1); // chunk_id_3
|
assert_eq!(summary_chunks[2].row_count, 1); // chunk_id_3
|
||||||
assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly);
|
assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2
|
assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2
|
||||||
|
|
||||||
// Get min partition checkpoint which is the checkpoint of the first chunk
|
// Get min partition checkpoint which is the checkpoint of the first chunk
|
||||||
let min_partition_checkpoint = {
|
let min_partition_checkpoint = {
|
||||||
let chunk = chunks[0].clone().chunk;
|
let chunk = chunks[0].clone().chunk;
|
||||||
|
@ -802,8 +810,12 @@ mod tests {
|
||||||
iox_metadata.partition_checkpoint
|
iox_metadata.partition_checkpoint
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// unlock partition
|
||||||
|
partition.into_data();
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Create 3 delete predicates that will delete all cookies in 3 different deletes
|
// Create 3 delete predicates that will delete all cookies in 3 different deletes
|
||||||
|
|
||||||
let pred1 = Arc::new(DeletePredicate {
|
let pred1 = Arc::new(DeletePredicate {
|
||||||
range: TimestampRange { start: 0, end: 11 },
|
range: TimestampRange { start: 0, end: 11 },
|
||||||
exprs: vec![DeleteExpr::new(
|
exprs: vec![DeleteExpr::new(
|
||||||
|
@ -829,69 +841,87 @@ mod tests {
|
||||||
)],
|
)],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Apply deletes pred1 and pred2 before compaction
|
// Apply deletes pred1 and pred2 before compaction
|
||||||
|
|
||||||
db.delete("cpu", pred1).unwrap();
|
db.delete("cpu", pred1).unwrap();
|
||||||
db.delete("cpu", pred2.clone()).unwrap();
|
db.delete(
|
||||||
|
"cpu",
|
||||||
|
Arc::<data_types::delete_predicate::DeletePredicate>::clone(&pred2),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Compact 3 contiguous chunks 1, 2, 3
|
// Compact 3 contiguous chunks 1, 2, 3
|
||||||
let partition = partition.upgrade();
|
|
||||||
|
// acquire write lock to start compacting
|
||||||
|
let partition = db.lockable_partition("cpu", partition_key).unwrap();
|
||||||
|
let partition = partition.write();
|
||||||
let chunk1 = chunks[0].write();
|
let chunk1 = chunks[0].write();
|
||||||
let chunk2 = chunks[1].write();
|
let chunk2 = chunks[1].write();
|
||||||
let chunk3 = chunks[2].write();
|
let chunk3 = chunks[2].write();
|
||||||
|
|
||||||
// Start the OS compaction job but do not poll it yet
|
// Start the OS compaction job but do not poll it yet
|
||||||
let (_tracker, fut) = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3])
|
let (_tracker, fut) =
|
||||||
.unwrap();
|
compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]).unwrap();
|
||||||
// Now apply deletes pred2 (again) and pred3
|
|
||||||
db.delete("cpu", pred2).unwrap();
|
// Apply deletes pred2 (again) and pred3
|
||||||
db.delete("cpu", pred3.clone()).unwrap();
|
db.delete("cpu", pred2).unwrap(); // duplicate delete and will not have any effects
|
||||||
// Now continue running compaction job
|
db.delete(
|
||||||
|
"cpu",
|
||||||
|
Arc::<data_types::delete_predicate::DeletePredicate>::clone(&pred3),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Continue running compaction job
|
||||||
let compacted_chunk = tokio::spawn(fut).await.unwrap().unwrap().unwrap();
|
let compacted_chunk = tokio::spawn(fut).await.unwrap().unwrap().unwrap();
|
||||||
|
|
||||||
// -----------------------------------------------
|
// -----------------------------------------------
|
||||||
// Verify results after compaction
|
// Verify results after compaction
|
||||||
|
|
||||||
// Verify MUB chunk still there but the other 3 OS chunks are now compacted into 1 OS chunks
|
|
||||||
let partition = db.partition("cpu", partition_key).unwrap();
|
let partition = db.partition("cpu", partition_key).unwrap();
|
||||||
let partition = partition.read();
|
let partition = partition.read();
|
||||||
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
let mut summary_chunks: Vec<_> = partition.chunk_summaries().collect();
|
||||||
|
|
||||||
|
// ----------
|
||||||
|
// Verify chunk types and number of rows in each
|
||||||
summary_chunks.sort_by_key(|c| c.storage);
|
summary_chunks.sort_by_key(|c| c.storage);
|
||||||
assert_eq!(summary_chunks.len(), 2);
|
assert_eq!(summary_chunks.len(), 2);
|
||||||
// MUB
|
// MUB still here
|
||||||
assert_eq!(summary_chunks[0].storage, ChunkStorage::OpenMutableBuffer);
|
assert_eq!(summary_chunks[0].storage, ChunkStorage::ClosedMutableBuffer);
|
||||||
assert_eq!(summary_chunks[0].row_count, 1);
|
assert_eq!(summary_chunks[0].row_count, 1);
|
||||||
// OS: the result of compacting all 3 persisted chunks
|
// OS: the result of compacting all 3 persisted chunks
|
||||||
assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly);
|
assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly);
|
||||||
assert_eq!(summary_chunks[1].row_count, 2);
|
// 2 rows + 1 soft deleted row due to pred3 happens during compaction
|
||||||
|
assert_eq!(summary_chunks[1].row_count, 3);
|
||||||
|
|
||||||
|
// ----------
|
||||||
// Verify delete predicates in the in-memory catalog
|
// Verify delete predicates in the in-memory catalog
|
||||||
let check_closure = |catalog: &Catalog| {
|
let catalog_chunks = &db.catalog.chunks();
|
||||||
let catalog_chunks = catalog.chunks();
|
// In-memory catalog should include 2 chunks: MUB and OS
|
||||||
assert_eq!(catalog_chunks.len(), 2);
|
assert_eq!(catalog_chunks.len(), 2);
|
||||||
|
|
||||||
let chunk = &catalog_chunks[0];
|
let chunk = &catalog_chunks[0];
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
let del_preds1 = chunk.delete_predicates();
|
let del_preds1 = chunk.delete_predicates();
|
||||||
let id1 = chunk.id();
|
let id1 = chunk.id();
|
||||||
|
|
||||||
let chunk = &catalog_chunks[1];
|
let chunk = &catalog_chunks[1];
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
let del_preds2 = chunk.delete_predicates();
|
let del_preds2 = chunk.delete_predicates();
|
||||||
//let id2 = chunk.id();
|
|
||||||
|
|
||||||
let compacted_delete_preds = {
|
// Get delete pred of the compacted chunk
|
||||||
if id1 == summary_chunks[0].id { del_preds1 }
|
let compacted_delete_preds = {
|
||||||
else { del_preds2 }
|
if id1 == summary_chunks[1].id {
|
||||||
};
|
del_preds1
|
||||||
|
} else {
|
||||||
// Should only include pred3
|
del_preds2
|
||||||
assert_eq!(compacted_delete_preds,vec![Arc::clone(&pred3)]);
|
}
|
||||||
};
|
};
|
||||||
check_closure(&db.catalog);
|
// should include pred3 as it happened concurrently with the compaction
|
||||||
|
assert_eq!(compacted_delete_preds, vec![Arc::clone(&pred3)]);
|
||||||
|
|
||||||
|
// ----------
|
||||||
// Verify delete predicates in preserved catalog
|
// Verify delete predicates in preserved catalog
|
||||||
let metric_registry = Arc::new(metric::Registry::new());
|
let metric_registry = Arc::new(metric::Registry::new());
|
||||||
let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog(
|
let (_preserved_catalog, catalog, _replay_plan) = load_or_create_preserved_catalog(
|
||||||
|
@ -904,8 +934,21 @@ mod tests {
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
check_closure(&catalog);
|
|
||||||
|
|
||||||
|
// preserved catalog only has knowledge of the compacted OS chunk
|
||||||
|
let catalog_chunks = catalog.chunks();
|
||||||
|
assert_eq!(catalog_chunks.len(), 1);
|
||||||
|
let chunk = catalog_chunks[0].read();
|
||||||
|
let preserved_compacted_delete_preds = chunk.delete_predicates();
|
||||||
|
|
||||||
|
// The brand new compacted OS chunk should not have any delete predicates.
|
||||||
|
// Note that, by designed, predicates of deletes that happen concurrently with compacting OS are only
|
||||||
|
// kept in its in-memory catalog chunk (and will be added in the preserved catalog in the background later).
|
||||||
|
// The brand new compacted OS chunk does not include any delete predicates. All previous soft-delete
|
||||||
|
// predicates were already hard deleted during its OS compaction.
|
||||||
|
assert!(preserved_compacted_delete_preds.is_empty());
|
||||||
|
|
||||||
|
// ----------
|
||||||
// verify partition checkpoint of the compacted chunk which should be the
|
// verify partition checkpoint of the compacted chunk which should be the
|
||||||
// one of the first OS chunk (earliest checkpoint)
|
// one of the first OS chunk (earliest checkpoint)
|
||||||
let chunk = compacted_chunk.unwrap();
|
let chunk = compacted_chunk.unwrap();
|
||||||
|
@ -917,7 +960,7 @@ mod tests {
|
||||||
.read_iox_metadata()
|
.read_iox_metadata()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let compacted_partition_checkpoint = iox_metadata.partition_checkpoint;
|
let compacted_partition_checkpoint = iox_metadata.partition_checkpoint;
|
||||||
assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint);
|
assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
Loading…
Reference in New Issue