From 13de7ac954c1e45d9b123592bcf9b59154aa8542 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 9 Sep 2022 15:45:15 -0400 Subject: [PATCH] feat: Record reasons for skipping compaction of a partition in the database Closes #5458. --- compactor/src/compact_hot_partitions.rs | 56 +++++++++++++++++++++---- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/compactor/src/compact_hot_partitions.rs b/compactor/src/compact_hot_partitions.rs index 49193e2d8e..b995c1e00f 100644 --- a/compactor/src/compact_hot_partitions.rs +++ b/compactor/src/compact_hot_partitions.rs @@ -198,8 +198,16 @@ async fn compact_candidates_with_memory_budget( compaction_type, "skipped due to missing column types of its table" ); - // todo: add this partition and its info into a new catalog table - // https://github.com/influxdata/influxdb_iox/issues/5458 + + let mut repos = compactor.catalog.repositories().await; + let record_skip = repos + .partitions() + .record_skipped_compaction(partition_id, "missing column types") + .await; + if let Err(e) = record_skip { + warn!(?partition_id, %e, "could not log skipped compaction"); + } + None } Some(columns) => { @@ -256,8 +264,18 @@ async fn compact_candidates_with_memory_budget( compaction_type, "skipped; error in estimating compacting memory" ); - // todo: add this partition and its info into a new catalog table - // https://github.com/influxdata/influxdb_iox/issues/5458 + + let mut repos = compactor.catalog.repositories().await; + let record_skip = repos + .partitions() + .record_skipped_compaction( + partition_id, + "error in estimating compacting memory", + ) + .await; + if let Err(e) = record_skip { + warn!(?partition_id, %e, "could not log skipped compaction"); + } } FilterResult::OverBudget => { if to_compact.budget_bytes() <= compactor.config.memory_budget_bytes { @@ -273,8 +291,14 @@ async fn compact_candidates_with_memory_budget( compaction_type, "skipped; over memory budget" ); - // todo: add this partition and its info into a new catalog table - // https://github.com/influxdata/influxdb_iox/issues/5458 + let mut repos = compactor.catalog.repositories().await; + let record_skip = repos + .partitions() + .record_skipped_compaction(partition_id, "over memory budget") + .await; + if let Err(e) = record_skip { + warn!(?partition_id, %e, "could not log skipped compaction"); + } } } FilterResult::Proceeed => { @@ -708,6 +732,15 @@ mod tests { sorted_candidates.sort_by_key(|c| c.candidate.partition_id); let sorted_candidates = sorted_candidates.into_iter().collect::>(); + { + let mut repos = compactor.catalog.repositories().await; + let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); + assert!( + skipped_compactions.is_empty(), + "Expected no skipped compactions, got: {skipped_compactions:?}" + ); + } + // There are 3 rounds of parallel compaction // . Round 1: 3 candidates [P1, P2, P5] and total needed budget 13,500 // . Round 2: 1 candidate [P6] and total needed budget 4,500 @@ -717,7 +750,8 @@ mod tests { // Todo next: So conveniently, debug log shows this is also a reproducer of // https://github.com/influxdata/conductor/issues/1130 - // "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema" + // "hot compaction failed: 1, "Could not serialize and persist record batches failed to + // peek record stream schema" compact_candidates_with_memory_budget( Arc::clone(&compactor), "hot", @@ -792,6 +826,14 @@ mod tests { let g3_candidate1_pf_ids: Vec<_> = g3_candidate1.files.iter().map(|pf| pf.id().get()).collect(); assert_eq!(g3_candidate1_pf_ids, vec![6, 5]); + + { + let mut repos = compactor.catalog.repositories().await; + let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); + assert_eq!(skipped_compactions.len(), 1); + assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id); + assert_eq!(skipped_compactions[0].reason, "over memory budget"); + } } // A quite sophisticated integration test