From 77a9191a11fec4e26a6cf0ac0d063f4b6f460560 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 13 Jul 2021 13:27:29 +0200 Subject: [PATCH] fix: chunk dropping over lifecycle policy should also respect the preserved catalog --- lifecycle/src/lib.rs | 6 +++--- lifecycle/src/policy.rs | 24 ++++++++++++++++-------- server/src/db/lifecycle.rs | 12 +++++++----- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 0ac0f86564..36bf67c710 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -104,9 +104,9 @@ pub trait LockablePartition: Sized + std::fmt::Display { /// Drops a chunk from the partition fn drop_chunk( - s: LifecycleWriteGuard<'_, Self::Partition, Self>, - chunk_id: u32, - ) -> Result<(), Self::Error>; + partition: LifecycleWriteGuard<'_, Self::Partition, Self>, + chunk: LifecycleWriteGuard<'_, ::Chunk, Self::Chunk>, + ) -> Result::Job>, Self::Error>; } /// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 09cdb385c3..ffbd1fbc22 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -134,11 +134,13 @@ where FreeAction::Drop => match chunk.storage() { ChunkStorage::ReadBuffer | ChunkStorage::ClosedMutableBuffer => { - LockablePartition::drop_chunk( + let tracker = LockablePartition::drop_chunk( partition.upgrade(), - candidate.chunk_id, + chunk.upgrade(), ) .expect("failed to drop") + .with_metadata(ChunkLifecycleAction::Dropping); + self.trackers.push(tracker); } storage => warn!( %db_name, @@ -858,12 +860,18 @@ mod tests { } fn drop_chunk( - mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, - chunk_id: u32, - ) -> Result<(), Self::Error> { - s.chunks.remove(&chunk_id); - s.data().db.events.write().push(MoverEvents::Drop(chunk_id)); - Ok(()) + mut partition: LifecycleWriteGuard<'_, Self::Partition, Self>, + chunk: LifecycleWriteGuard<'_, TestChunk, Self::Chunk>, + ) -> Result, Self::Error> { + let chunk_id = chunk.addr().chunk_id; + partition.chunks.remove(&chunk_id); + partition + .data() + .db + .events + .write() + .push(MoverEvents::Drop(chunk_id)); + Ok(TaskTracker::complete(())) } } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 7aea95f88b..fb637cb624 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -232,11 +232,13 @@ impl LockablePartition for LockableCatalogPartition { } fn drop_chunk( - mut s: LifecycleWriteGuard<'_, Self::Partition, Self>, - chunk_id: u32, - ) -> Result<(), Self::Error> { - s.drop_chunk(chunk_id)?; - Ok(()) + partition: LifecycleWriteGuard<'_, Self::Partition, Self>, + chunk: LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>, + ) -> Result, Self::Error> { + info!(table=%partition.table_name(), partition=%partition.partition_key(), chunk_id=chunk.addr().chunk_id, "drop chunk"); + let (tracker, fut) = drop::drop_chunk(partition, chunk)?; + let _ = tokio::spawn(async move { fut.await.log_if_error("drop chunk") }); + Ok(tracker) } }