* feat: skip over fully persisted partitions (#1962) * chore: review feedbackpull/24376/head
parent
d35b74c226
commit
5a0caeab44
|
@ -158,6 +158,9 @@ pub trait LockableChunk: Sized {
|
|||
pub trait LifecyclePartition {
|
||||
fn partition_key(&self) -> &str;
|
||||
|
||||
/// Returns true if all chunks in the partition are persisted.
|
||||
fn is_persisted(&self) -> bool;
|
||||
|
||||
/// Returns an approximation of the number of rows that can be persisted
|
||||
fn persistable_row_count(&self) -> usize;
|
||||
|
||||
|
|
|
@ -194,6 +194,10 @@ where
|
|||
|
||||
// TODO: Encapsulate locking into a CatalogTransaction type
|
||||
let partition = partition.read();
|
||||
if partition.is_persisted() {
|
||||
debug!(db_name = %self.db.name(), %partition, "nothing to be compacted for partition");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut chunks = LockablePartition::chunks(&partition);
|
||||
// Sort by chunk ID to ensure a stable lock order
|
||||
|
@ -298,6 +302,11 @@ where
|
|||
// TODO: Encapsulate locking into a CatalogTransaction type
|
||||
let partition = partition.read();
|
||||
|
||||
if partition.is_persisted() {
|
||||
debug!(%db_name, %partition, "nothing to persist for partition");
|
||||
return false;
|
||||
}
|
||||
|
||||
let persistable_age_seconds = partition
|
||||
.minimum_unpersisted_age()
|
||||
.and_then(|minimum_unpersisted_age| {
|
||||
|
@ -305,11 +314,10 @@ where
|
|||
// started and this check is done, the duration may be
|
||||
// negative. Skip persistence in this case to avoid
|
||||
// panic in `duration_since`
|
||||
if minimum_unpersisted_age <= now {
|
||||
Some(now.duration_since(minimum_unpersisted_age).as_secs())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
Some(
|
||||
now.checked_duration_since(minimum_unpersisted_age)?
|
||||
.as_secs(),
|
||||
)
|
||||
})
|
||||
.unwrap_or_default() as u32;
|
||||
|
||||
|
@ -898,6 +906,10 @@ mod tests {
|
|||
"test"
|
||||
}
|
||||
|
||||
fn is_persisted(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn persistable_row_count(&self) -> usize {
|
||||
self.persistable_row_count
|
||||
}
|
||||
|
|
|
@ -262,6 +262,12 @@ impl LifecyclePartition for Partition {
|
|||
self.key()
|
||||
}
|
||||
|
||||
fn is_persisted(&self) -> bool {
|
||||
self.persistence_windows()
|
||||
.map(|w| w.minimum_unpersisted_age().is_none())
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
fn persistable_row_count(&self) -> usize {
|
||||
self.persistence_windows()
|
||||
.map(|w| w.persistable_row_count())
|
||||
|
|
Loading…
Reference in New Issue