feat: instead of adding num_files and memory budget into the reason text column, let us create differnt columns for them. We will be able to filter them easily (#5742)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
84b10b28b2
commit
75ff805ee2
|
|
@ -154,7 +154,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
debug!(?partition_id, compaction_type, "nothing to compact");
|
||||
}
|
||||
FilterResult::OverLimitFileNum {
|
||||
file_num,
|
||||
num_files,
|
||||
budget_bytes,
|
||||
} => {
|
||||
// We cannot compact this partition because its first set of overlapped files
|
||||
|
|
@ -163,20 +163,26 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
?partition_id,
|
||||
?table_id,
|
||||
compaction_type,
|
||||
file_num,
|
||||
num_files,
|
||||
budget_bytes,
|
||||
file_num_limit = compactor.config.max_num_compacting_files,
|
||||
memory_budget_bytes = compactor.config.memory_budget_bytes,
|
||||
"skipped; over limit of number of files"
|
||||
);
|
||||
let reason = format!(
|
||||
"over limit of num_files. Needed num_files = {}, limit of num_iles = {}. Needed budget = {}, memory budget = {}",
|
||||
file_num, compactor.config.max_num_compacting_files, budget_bytes, compactor.config.memory_budget_bytes
|
||||
);
|
||||
record_skipped_compaction(partition_id, Arc::clone(&compactor), &reason).await;
|
||||
record_skipped_compaction(
|
||||
partition_id,
|
||||
Arc::clone(&compactor),
|
||||
"over limit of num_files",
|
||||
num_files,
|
||||
compactor.config.max_num_compacting_files,
|
||||
budget_bytes,
|
||||
compactor.config.memory_budget_bytes,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
FilterResult::OverBudget {
|
||||
budget_bytes: needed_bytes,
|
||||
num_files,
|
||||
} => {
|
||||
if needed_bytes <= compactor.config.memory_budget_bytes {
|
||||
// Required budget is larger than the remaining budget but smaller than
|
||||
|
|
@ -191,14 +197,20 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
compaction_type,
|
||||
?needed_bytes,
|
||||
memory_budget_bytes = compactor.config.memory_budget_bytes,
|
||||
?num_files,
|
||||
limit_num_files = compactor.config.max_num_compacting_files,
|
||||
"skipped; over memory budget"
|
||||
);
|
||||
let reason = format!(
|
||||
"over memory budget. Needed budget = {}, memory budget = {}",
|
||||
needed_bytes, compactor.config.memory_budget_bytes
|
||||
);
|
||||
record_skipped_compaction(partition_id, Arc::clone(&compactor), &reason)
|
||||
.await;
|
||||
record_skipped_compaction(
|
||||
partition_id,
|
||||
Arc::clone(&compactor),
|
||||
"over memory budget",
|
||||
num_files,
|
||||
compactor.config.max_num_compacting_files,
|
||||
needed_bytes,
|
||||
compactor.config.memory_budget_bytes,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
FilterResult::Proceed {
|
||||
|
|
@ -252,11 +264,22 @@ async fn record_skipped_compaction(
|
|||
partition_id: PartitionId,
|
||||
compactor: Arc<Compactor>,
|
||||
reason: &str,
|
||||
num_files: usize,
|
||||
limit_num_files: usize,
|
||||
estimated_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
) {
|
||||
let mut repos = compactor.catalog.repositories().await;
|
||||
let record_skip = repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(partition_id, reason)
|
||||
.record_skipped_compaction(
|
||||
partition_id,
|
||||
reason,
|
||||
num_files,
|
||||
limit_num_files,
|
||||
estimated_bytes,
|
||||
limit_bytes,
|
||||
)
|
||||
.await;
|
||||
if let Err(e) = record_skip {
|
||||
warn!(?partition_id, %e, "could not log skipped compaction");
|
||||
|
|
@ -818,10 +841,7 @@ pub mod tests {
|
|||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id);
|
||||
assert_eq!(
|
||||
skipped_compactions[0].reason,
|
||||
"over memory budget. Needed budget = 15100, memory budget = 14350"
|
||||
);
|
||||
assert_eq!(skipped_compactions[0].reason, "over memory budget");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,11 +20,12 @@ pub(crate) struct FilteredFiles {
|
|||
pub(crate) enum FilterResult {
|
||||
NothingToCompact,
|
||||
OverLimitFileNum {
|
||||
file_num: usize,
|
||||
num_files: usize,
|
||||
budget_bytes: u64,
|
||||
},
|
||||
OverBudget {
|
||||
budget_bytes: u64,
|
||||
num_files: usize,
|
||||
},
|
||||
Proceed {
|
||||
files: Vec<CompactorParquetFile>,
|
||||
|
|
@ -150,7 +151,7 @@ fn filter_parquet_files_inner(
|
|||
// Cannot compact this partition because its first set of overlapped files
|
||||
// exceed the file limit
|
||||
return FilterResult::OverLimitFileNum {
|
||||
file_num: 1 + overlaps.len(),
|
||||
num_files: 1 + overlaps.len(),
|
||||
budget_bytes: estimated_file_bytes,
|
||||
};
|
||||
} else {
|
||||
|
|
@ -163,6 +164,7 @@ fn filter_parquet_files_inner(
|
|||
// Cannot compact this partition further with the given budget
|
||||
return FilterResult::OverBudget {
|
||||
budget_bytes: estimated_file_bytes,
|
||||
num_files: 1 + overlaps.len(),
|
||||
};
|
||||
} else {
|
||||
// Only compact the ones under the given budget
|
||||
|
|
@ -458,7 +460,10 @@ mod tests {
|
|||
|
||||
assert_eq!(
|
||||
filter_result,
|
||||
FilterResult::OverBudget { budget_bytes: 1176 }
|
||||
FilterResult::OverBudget {
|
||||
budget_bytes: 1176,
|
||||
num_files: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -479,7 +484,10 @@ mod tests {
|
|||
|
||||
assert_eq!(
|
||||
filter_result,
|
||||
FilterResult::OverBudget { budget_bytes: 1176 }
|
||||
FilterResult::OverBudget {
|
||||
budget_bytes: 1176,
|
||||
num_files: 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -501,7 +509,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
filter_result,
|
||||
FilterResult::OverLimitFileNum {
|
||||
file_num: 1,
|
||||
num_files: 1,
|
||||
budget_bytes: 1176
|
||||
}
|
||||
);
|
||||
|
|
@ -536,7 +544,7 @@ mod tests {
|
|||
assert_eq!(
|
||||
filter_result,
|
||||
FilterResult::OverLimitFileNum {
|
||||
file_num: 2,
|
||||
num_files: 2,
|
||||
budget_bytes: 2 * 1176
|
||||
}
|
||||
);
|
||||
|
|
|
|||
|
|
@ -920,6 +920,14 @@ pub struct SkippedCompaction {
|
|||
pub reason: String,
|
||||
/// when compaction was skipped
|
||||
pub skipped_at: Timestamp,
|
||||
/// estimated memory budget
|
||||
pub estimated_bytes: i64,
|
||||
/// limit on memory budget
|
||||
pub limit_bytes: i64,
|
||||
/// num files selected to compact
|
||||
pub num_files: i64,
|
||||
/// limit on num files
|
||||
pub limit_num_files: i64,
|
||||
}
|
||||
|
||||
/// Data object for a tombstone.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
ALTER TABLE IF EXISTS skipped_compactions
|
||||
ADD COLUMN IF NOT EXISTS num_files BIGINT DEFAULT NULL,
|
||||
ADD COLUMN IF NOT EXISTS limit_num_files BIGINT DEFAULT NULL,
|
||||
ADD COLUMN IF NOT EXISTS estimated_bytes BIGINT DEFAULT NULL,
|
||||
ADD COLUMN IF NOT EXISTS limit_bytes BIGINT DEFAULT NULL;
|
||||
|
|
@ -476,6 +476,10 @@ pub trait PartitionRepo: Send + Sync {
|
|||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
num_files: usize,
|
||||
limit_num_files: usize,
|
||||
estimated_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
) -> Result<()>;
|
||||
|
||||
/// List the records of compacting a partition being skipped. This is mostly useful for testing.
|
||||
|
|
@ -1590,26 +1594,34 @@ pub(crate) mod test_helpers {
|
|||
);
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(other_partition.id, "I am le tired")
|
||||
.record_skipped_compaction(other_partition.id, "I am le tired", 1, 2, 10, 20)
|
||||
.await
|
||||
.unwrap();
|
||||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, other_partition.id);
|
||||
assert_eq!(skipped_compactions[0].reason, "I am le tired");
|
||||
assert_eq!(skipped_compactions[0].num_files, 1);
|
||||
assert_eq!(skipped_compactions[0].limit_num_files, 2);
|
||||
assert_eq!(skipped_compactions[0].estimated_bytes, 10);
|
||||
assert_eq!(skipped_compactions[0].limit_bytes, 20);
|
||||
|
||||
// Only save the last reason that any particular partition was skipped (really if the
|
||||
// partition appears in the skipped compactions, it shouldn't become a compaction candidate
|
||||
// again, but race conditions and all that)
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(other_partition.id, "I'm on fire")
|
||||
.record_skipped_compaction(other_partition.id, "I'm on fire", 11, 12, 110, 120)
|
||||
.await
|
||||
.unwrap();
|
||||
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
||||
assert_eq!(skipped_compactions.len(), 1);
|
||||
assert_eq!(skipped_compactions[0].partition_id, other_partition.id);
|
||||
assert_eq!(skipped_compactions[0].reason, "I'm on fire");
|
||||
assert_eq!(skipped_compactions[0].num_files, 11);
|
||||
assert_eq!(skipped_compactions[0].limit_num_files, 12);
|
||||
assert_eq!(skipped_compactions[0].estimated_bytes, 110);
|
||||
assert_eq!(skipped_compactions[0].limit_bytes, 120);
|
||||
|
||||
// Test setting and reading the per-partition persistence numbers
|
||||
let partition = repos
|
||||
|
|
@ -3030,7 +3042,14 @@ pub(crate) mod test_helpers {
|
|||
// The compactor skipped compacting another_partition
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(another_partition.id, "Not feeling up to it today")
|
||||
.record_skipped_compaction(
|
||||
another_partition.id,
|
||||
"Not feeling up to it today",
|
||||
1,
|
||||
2,
|
||||
10,
|
||||
20,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -3360,7 +3379,7 @@ pub(crate) mod test_helpers {
|
|||
// The compactor skipped compacting another_partition
|
||||
repos
|
||||
.partitions()
|
||||
.record_skipped_compaction(another_partition.id, "Secret reasons")
|
||||
.record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 10, 20)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
|||
|
|
@ -852,6 +852,10 @@ impl PartitionRepo for MemTxn {
|
|||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
num_files: usize,
|
||||
limit_num_files: usize,
|
||||
estimated_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
) -> Result<()> {
|
||||
let reason = reason.to_string();
|
||||
let skipped_at = Timestamp::from(self.time_provider.now());
|
||||
|
|
@ -865,11 +869,19 @@ impl PartitionRepo for MemTxn {
|
|||
Some(s) => {
|
||||
s.reason = reason;
|
||||
s.skipped_at = skipped_at;
|
||||
s.num_files = num_files as i64;
|
||||
s.limit_num_files = limit_num_files as i64;
|
||||
s.estimated_bytes = estimated_bytes as i64;
|
||||
s.limit_bytes = limit_bytes as i64;
|
||||
}
|
||||
None => stage.skipped_compactions.push(SkippedCompaction {
|
||||
partition_id,
|
||||
reason,
|
||||
skipped_at,
|
||||
num_files: num_files as i64,
|
||||
limit_num_files: limit_num_files as i64,
|
||||
estimated_bytes: estimated_bytes as i64,
|
||||
limit_bytes: limit_bytes as i64,
|
||||
}),
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -247,7 +247,7 @@ decorate!(
|
|||
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
|
||||
"partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result<Option<PartitionInfo>>;
|
||||
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result<Partition>;
|
||||
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str) -> Result<()>;
|
||||
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize,estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
|
||||
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
|
||||
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1312,22 +1312,34 @@ RETURNING *;
|
|||
&mut self,
|
||||
partition_id: PartitionId,
|
||||
reason: &str,
|
||||
num_files: usize,
|
||||
limit_num_files: usize,
|
||||
estimated_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
) -> Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO skipped_compactions
|
||||
( partition_id, reason, skipped_at )
|
||||
( partition_id, reason, num_files, limit_num_files, estimated_bytes, limit_bytes, skipped_at )
|
||||
VALUES
|
||||
( $1, $2, extract(epoch from NOW()) )
|
||||
( $1, $2, $3, $4, $5, $6, extract(epoch from NOW()) )
|
||||
ON CONFLICT ( partition_id )
|
||||
DO UPDATE
|
||||
SET
|
||||
reason = EXCLUDED.reason,
|
||||
num_files = EXCLUDED.num_files,
|
||||
limit_num_files = EXCLUDED.limit_num_files,
|
||||
estimated_bytes = EXCLUDED.estimated_bytes,
|
||||
limit_bytes = EXCLUDED.limit_bytes,
|
||||
skipped_at = EXCLUDED.skipped_at;
|
||||
"#,
|
||||
)
|
||||
.bind(partition_id)
|
||||
.bind(partition_id) // $1
|
||||
.bind(reason)
|
||||
.bind(num_files as i64)
|
||||
.bind(limit_num_files as i64)
|
||||
.bind(estimated_bytes as i64)
|
||||
.bind(limit_bytes as i64)
|
||||
.execute(&mut self.inner)
|
||||
.await
|
||||
.context(interface::CouldNotRecordSkippedCompactionSnafu { partition_id })?;
|
||||
|
|
|
|||
Loading…
Reference in New Issue