feat: add to-be-create-chunk-id for compacting OS chunks in CatalogChunk
parent
27560abd48
commit
ffc970a60f
|
@ -98,9 +98,7 @@ pub enum ChunkLifecycleAction {
|
|||
Compacting,
|
||||
|
||||
/// Object Store Chunk is in the process of being compacted
|
||||
/// The ChunkId is the ID of the new chunk that will replace this chunk
|
||||
/// after the compaction is completed
|
||||
CompactingObjectStore(ChunkId),
|
||||
CompactingObjectStore,
|
||||
|
||||
/// Chunk is about to be dropped from memory and (if persisted) from object store
|
||||
Dropping,
|
||||
|
@ -120,7 +118,7 @@ impl ChunkLifecycleAction {
|
|||
match self {
|
||||
Self::Persisting => "Persisting to Object Storage",
|
||||
Self::Compacting => "Compacting",
|
||||
Self::CompactingObjectStore(_chunk_id) => "Compacting Object Store",
|
||||
Self::CompactingObjectStore => "Compacting Object Store",
|
||||
Self::Dropping => "Dropping",
|
||||
Self::LoadingReadBuffer => "Loading to Read Buffer",
|
||||
}
|
||||
|
@ -149,6 +147,10 @@ pub struct ChunkSummary {
|
|||
/// Is there any outstanding lifecycle action for this chunk?
|
||||
pub lifecycle_action: Option<ChunkLifecycleAction>,
|
||||
|
||||
// todo: I am debating whether to add to_be_created_chunk_id here.
|
||||
// If we want this to fully reflect CatalogChunk, then we should add it.
|
||||
// However if we do, we also need to add it to management::Chunk API which I found maybe better to
|
||||
// go with the other solution https://github.com/influxdata/influxdb_iox/pull/3275#pullrequestreview-821179885
|
||||
/// The number of bytes used to store this chunk in memory
|
||||
pub memory_bytes: usize,
|
||||
|
||||
|
|
|
@ -64,9 +64,7 @@ impl From<Option<ChunkLifecycleAction>> for management::ChunkLifecycleAction {
|
|||
match lifecycle_action {
|
||||
Some(ChunkLifecycleAction::Persisting) => Self::Persisting,
|
||||
Some(ChunkLifecycleAction::Compacting) => Self::Compacting,
|
||||
Some(ChunkLifecycleAction::CompactingObjectStore(_chunk_id)) => {
|
||||
Self::CompactingObjectStore
|
||||
} // todo: use chunk_id
|
||||
Some(ChunkLifecycleAction::CompactingObjectStore) => Self::CompactingObjectStore,
|
||||
Some(ChunkLifecycleAction::Dropping) => Self::Dropping,
|
||||
Some(ChunkLifecycleAction::LoadingReadBuffer) => Self::LoadingReadBuffer,
|
||||
None => Self::Unspecified,
|
||||
|
@ -158,8 +156,7 @@ impl TryFrom<management::ChunkLifecycleAction> for Option<ChunkLifecycleAction>
|
|||
Ok(Some(ChunkLifecycleAction::Compacting))
|
||||
}
|
||||
management::ChunkLifecycleAction::CompactingObjectStore => {
|
||||
let chunk_id = ChunkId::new_test(1); // todo: need to replace 1 with a meaningful chunk_id
|
||||
Ok(Some(ChunkLifecycleAction::CompactingObjectStore(chunk_id)))
|
||||
Ok(Some(ChunkLifecycleAction::CompactingObjectStore))
|
||||
}
|
||||
management::ChunkLifecycleAction::LoadingReadBuffer => {
|
||||
Ok(Some(ChunkLifecycleAction::LoadingReadBuffer))
|
||||
|
|
|
@ -205,6 +205,15 @@ pub struct CatalogChunk {
|
|||
/// or even triggering graceful termination of it
|
||||
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
|
||||
|
||||
/// Chunk Id that goes with ChunkLifecycleAction::CompactingObjectStore.
|
||||
/// This is the new chunkId if this chunk is compacted (with many other chunks) successfully and dropped.
|
||||
/// Since compacting object store chunks is a long process, we do not lock the whole process
|
||||
/// and deletes can happen during that time. This ID is needed to store delete predicates
|
||||
/// that happens during that time for it in a mailbox. The mailbox will be read
|
||||
/// in the background and update this chunk after it is completely created.
|
||||
/// See compact_object_store_chunks for the detail.
|
||||
to_be_created_chunk_id: Option<ChunkId>,
|
||||
|
||||
/// The metrics for this chunk
|
||||
///
|
||||
/// Wrapped in a mutex to allow updating metrics without exclusive access to CatalogChunk
|
||||
|
@ -295,6 +304,7 @@ impl CatalogChunk {
|
|||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
to_be_created_chunk_id: None,
|
||||
metrics: Mutex::new(metrics),
|
||||
access_recorder: AccessRecorder::new(Arc::clone(&time_provider)),
|
||||
time_provider,
|
||||
|
@ -334,6 +344,7 @@ impl CatalogChunk {
|
|||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
to_be_created_chunk_id: None,
|
||||
metrics: Mutex::new(metrics),
|
||||
access_recorder: AccessRecorder::new(Arc::clone(&time_provider)),
|
||||
time_provider,
|
||||
|
@ -377,6 +388,7 @@ impl CatalogChunk {
|
|||
addr,
|
||||
stage,
|
||||
lifecycle_action: None,
|
||||
to_be_created_chunk_id: None,
|
||||
metrics: Mutex::new(metrics),
|
||||
access_recorder: AccessRecorder::new(Arc::clone(&time_provider)),
|
||||
time_provider,
|
||||
|
@ -429,8 +441,8 @@ impl CatalogChunk {
|
|||
|
||||
pub fn in_lifecycle_compacting_object_store(&self) -> Option<ChunkId> {
|
||||
if let Some(task) = self.lifecycle_action.as_ref() {
|
||||
if let ChunkLifecycleAction::CompactingObjectStore(chunk_id) = task.metadata() {
|
||||
return Some(*chunk_id);
|
||||
if let ChunkLifecycleAction::CompactingObjectStore = task.metadata() {
|
||||
return self.to_be_created_chunk_id;
|
||||
}
|
||||
}
|
||||
None
|
||||
|
@ -737,7 +749,7 @@ impl CatalogChunk {
|
|||
pub fn set_compacting(&mut self, registration: &TaskRegistration) -> Result<()> {
|
||||
match &self.stage {
|
||||
ChunkStage::Open { .. } | ChunkStage::Frozen { .. } => {
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Compacting, registration)?;
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Compacting, None, registration)?;
|
||||
self.freeze()?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -774,7 +786,8 @@ impl CatalogChunk {
|
|||
}
|
||||
ChunkStage::Persisted { .. } => {
|
||||
self.set_lifecycle_action(
|
||||
ChunkLifecycleAction::CompactingObjectStore(compacted_chunk_id),
|
||||
ChunkLifecycleAction::CompactingObjectStore,
|
||||
Some(compacted_chunk_id),
|
||||
registration,
|
||||
)?;
|
||||
Ok(())
|
||||
|
@ -792,7 +805,7 @@ impl CatalogChunk {
|
|||
|
||||
match &self.stage {
|
||||
ChunkStage::Frozen { .. } => {
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Persisting, registration)?;
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Persisting, None, registration)?;
|
||||
Ok(())
|
||||
}
|
||||
_ => {
|
||||
|
@ -857,9 +870,11 @@ impl CatalogChunk {
|
|||
actual: "Persisted with ReadBuffer",
|
||||
}
|
||||
.fail(),
|
||||
None => {
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::LoadingReadBuffer, registration)
|
||||
}
|
||||
None => self.set_lifecycle_action(
|
||||
ChunkLifecycleAction::LoadingReadBuffer,
|
||||
None,
|
||||
registration,
|
||||
),
|
||||
},
|
||||
_ => {
|
||||
unexpected_state!(self, "setting unload", "Persisted", &self.stage)
|
||||
|
@ -909,7 +924,7 @@ impl CatalogChunk {
|
|||
|
||||
/// Start lifecycle action that should result in the chunk being dropped from memory and (if persisted) from object store.
|
||||
pub fn set_dropping(&mut self, registration: &TaskRegistration) -> Result<()> {
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Dropping, registration)?;
|
||||
self.set_lifecycle_action(ChunkLifecycleAction::Dropping, None, registration)?;
|
||||
|
||||
// set memory metrics to 0 to stop accounting for this chunk within the catalog
|
||||
self.metrics.lock().memory_metrics.set_to_zero();
|
||||
|
@ -921,6 +936,7 @@ impl CatalogChunk {
|
|||
fn set_lifecycle_action(
|
||||
&mut self,
|
||||
lifecycle_action: ChunkLifecycleAction,
|
||||
to_be_created_chunk_id: Option<ChunkId>, // only needed if lifecycle_action is CompactingObjectStore
|
||||
registration: &TaskRegistration,
|
||||
) -> Result<()> {
|
||||
if let Some(lifecycle_action) = &self.lifecycle_action {
|
||||
|
@ -930,6 +946,13 @@ impl CatalogChunk {
|
|||
});
|
||||
}
|
||||
self.lifecycle_action = Some(registration.clone().into_tracker(lifecycle_action));
|
||||
if lifecycle_action == ChunkLifecycleAction::CompactingObjectStore {
|
||||
assert!(
|
||||
to_be_created_chunk_id.is_some(),
|
||||
"Compacting Object Store must go with a chunk ID"
|
||||
);
|
||||
self.to_be_created_chunk_id = to_be_created_chunk_id;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -950,6 +973,7 @@ impl CatalogChunk {
|
|||
}
|
||||
}
|
||||
self.lifecycle_action = None;
|
||||
self.to_be_created_chunk_id = None;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -965,6 +989,7 @@ impl CatalogChunk {
|
|||
});
|
||||
}
|
||||
self.lifecycle_action = None;
|
||||
self.to_be_created_chunk_id = None;
|
||||
|
||||
// Some lifecycle actions (e.g. Drop) modify the memory metrics so that the catalog accounts chunks w/
|
||||
// actions correctly. When clearing out that action, we need to restore the pre-action state. The easiest
|
||||
|
@ -1060,7 +1085,7 @@ mod tests {
|
|||
|
||||
// set some action
|
||||
chunk
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration)
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, None, ®istration)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
*chunk.lifecycle_action().unwrap().metadata(),
|
||||
|
@ -1070,7 +1095,7 @@ mod tests {
|
|||
// setting an action while there is one running fails
|
||||
assert_eq!(
|
||||
chunk
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration)
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, None, ®istration)
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Internal Error: A lifecycle action \'Compacting\' is already in \
|
||||
|
@ -1104,7 +1129,7 @@ mod tests {
|
|||
|
||||
// now we can set another action
|
||||
chunk
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration)
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, None, ®istration)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
*chunk.lifecycle_action().unwrap().metadata(),
|
||||
|
@ -1122,7 +1147,7 @@ mod tests {
|
|||
|
||||
// set some action
|
||||
chunk
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration)
|
||||
.set_lifecycle_action(ChunkLifecycleAction::Compacting, None, ®istration)
|
||||
.unwrap();
|
||||
|
||||
// clearing now fails because task is still in progress
|
||||
|
|
Loading…
Reference in New Issue