fix: chunk dropping over lifecycle policy should also respect the preserved catalog
parent
71cb15f017
commit
77a9191a11
|
@ -104,9 +104,9 @@ pub trait LockablePartition: Sized + std::fmt::Display {
|
||||||
|
|
||||||
/// Drops a chunk from the partition
|
/// Drops a chunk from the partition
|
||||||
fn drop_chunk(
|
fn drop_chunk(
|
||||||
s: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||||
chunk_id: u32,
|
chunk: LifecycleWriteGuard<'_, <Self::Chunk as LockableChunk>::Chunk, Self::Chunk>,
|
||||||
) -> Result<(), Self::Error>;
|
) -> Result<TaskTracker<<Self::Chunk as LockableChunk>::Job>, Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for
|
/// A `LockableChunk` is a wrapper around a `LifecycleChunk` that allows for
|
||||||
|
|
|
@ -134,11 +134,13 @@ where
|
||||||
FreeAction::Drop => match chunk.storage() {
|
FreeAction::Drop => match chunk.storage() {
|
||||||
ChunkStorage::ReadBuffer
|
ChunkStorage::ReadBuffer
|
||||||
| ChunkStorage::ClosedMutableBuffer => {
|
| ChunkStorage::ClosedMutableBuffer => {
|
||||||
LockablePartition::drop_chunk(
|
let tracker = LockablePartition::drop_chunk(
|
||||||
partition.upgrade(),
|
partition.upgrade(),
|
||||||
candidate.chunk_id,
|
chunk.upgrade(),
|
||||||
)
|
)
|
||||||
.expect("failed to drop")
|
.expect("failed to drop")
|
||||||
|
.with_metadata(ChunkLifecycleAction::Dropping);
|
||||||
|
self.trackers.push(tracker);
|
||||||
}
|
}
|
||||||
storage => warn!(
|
storage => warn!(
|
||||||
%db_name,
|
%db_name,
|
||||||
|
@ -858,12 +860,18 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drop_chunk(
|
fn drop_chunk(
|
||||||
mut s: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
mut partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||||
chunk_id: u32,
|
chunk: LifecycleWriteGuard<'_, TestChunk, Self::Chunk>,
|
||||||
) -> Result<(), Self::Error> {
|
) -> Result<TaskTracker<()>, Self::Error> {
|
||||||
s.chunks.remove(&chunk_id);
|
let chunk_id = chunk.addr().chunk_id;
|
||||||
s.data().db.events.write().push(MoverEvents::Drop(chunk_id));
|
partition.chunks.remove(&chunk_id);
|
||||||
Ok(())
|
partition
|
||||||
|
.data()
|
||||||
|
.db
|
||||||
|
.events
|
||||||
|
.write()
|
||||||
|
.push(MoverEvents::Drop(chunk_id));
|
||||||
|
Ok(TaskTracker::complete(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -232,11 +232,13 @@ impl LockablePartition for LockableCatalogPartition {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drop_chunk(
|
fn drop_chunk(
|
||||||
mut s: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
partition: LifecycleWriteGuard<'_, Self::Partition, Self>,
|
||||||
chunk_id: u32,
|
chunk: LifecycleWriteGuard<'_, CatalogChunk, Self::Chunk>,
|
||||||
) -> Result<(), Self::Error> {
|
) -> Result<TaskTracker<Job>, Self::Error> {
|
||||||
s.drop_chunk(chunk_id)?;
|
info!(table=%partition.table_name(), partition=%partition.partition_key(), chunk_id=chunk.addr().chunk_id, "drop chunk");
|
||||||
Ok(())
|
let (tracker, fut) = drop::drop_chunk(partition, chunk)?;
|
||||||
|
let _ = tokio::spawn(async move { fut.await.log_if_error("drop chunk") });
|
||||||
|
Ok(tracker)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue