feat: Record reasons for skipping compaction of a partition in the database

Closes #5458.
pull/24376/head
Carol (Nichols || Goulding) 2022-09-09 15:45:15 -04:00
parent c92aebd595
commit 13de7ac954
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 49 additions and 7 deletions

View File

@ -198,8 +198,16 @@ async fn compact_candidates_with_memory_budget<C, Fut, F>(
compaction_type,
"skipped due to missing column types of its table"
);
// todo: add this partition and its info into a new catalog table
// https://github.com/influxdata/influxdb_iox/issues/5458
let mut repos = compactor.catalog.repositories().await;
let record_skip = repos
.partitions()
.record_skipped_compaction(partition_id, "missing column types")
.await;
if let Err(e) = record_skip {
warn!(?partition_id, %e, "could not log skipped compaction");
}
None
}
Some(columns) => {
@ -256,8 +264,18 @@ async fn compact_candidates_with_memory_budget<C, Fut, F>(
compaction_type,
"skipped; error in estimating compacting memory"
);
// todo: add this partition and its info into a new catalog table
// https://github.com/influxdata/influxdb_iox/issues/5458
let mut repos = compactor.catalog.repositories().await;
let record_skip = repos
.partitions()
.record_skipped_compaction(
partition_id,
"error in estimating compacting memory",
)
.await;
if let Err(e) = record_skip {
warn!(?partition_id, %e, "could not log skipped compaction");
}
}
FilterResult::OverBudget => {
if to_compact.budget_bytes() <= compactor.config.memory_budget_bytes {
@ -273,8 +291,14 @@ async fn compact_candidates_with_memory_budget<C, Fut, F>(
compaction_type,
"skipped; over memory budget"
);
// todo: add this partition and its info into a new catalog table
// https://github.com/influxdata/influxdb_iox/issues/5458
let mut repos = compactor.catalog.repositories().await;
let record_skip = repos
.partitions()
.record_skipped_compaction(partition_id, "over memory budget")
.await;
if let Err(e) = record_skip {
warn!(?partition_id, %e, "could not log skipped compaction");
}
}
}
FilterResult::Proceeed => {
@ -708,6 +732,15 @@ mod tests {
sorted_candidates.sort_by_key(|c| c.candidate.partition_id);
let sorted_candidates = sorted_candidates.into_iter().collect::<VecDeque<_>>();
{
let mut repos = compactor.catalog.repositories().await;
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
assert!(
skipped_compactions.is_empty(),
"Expected no skipped compactions, got: {skipped_compactions:?}"
);
}
// There are 3 rounds of parallel compaction
// . Round 1: 3 candidates [P1, P2, P5] and total needed budget 13,500
// . Round 2: 1 candidate [P6] and total needed budget 4,500
@ -717,7 +750,8 @@ mod tests {
// Todo next: So conveniently, debug log shows this is also a reproducer of
// https://github.com/influxdata/conductor/issues/1130
// "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema"
// "hot compaction failed: 1, "Could not serialize and persist record batches failed to
// peek record stream schema"
compact_candidates_with_memory_budget(
Arc::clone(&compactor),
"hot",
@ -792,6 +826,14 @@ mod tests {
let g3_candidate1_pf_ids: Vec<_> =
g3_candidate1.files.iter().map(|pf| pf.id().get()).collect();
assert_eq!(g3_candidate1_pf_ids, vec![6, 5]);
{
let mut repos = compactor.catalog.repositories().await;
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
assert_eq!(skipped_compactions.len(), 1);
assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id);
assert_eq!(skipped_compactions[0].reason, "over memory budget");
}
}
// A quite sophisticated integration test