diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 885cf59de1..13782d2ad5 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -154,7 +154,7 @@ async fn compact_candidates_with_memory_budget( debug!(?partition_id, compaction_type, "nothing to compact"); } FilterResult::OverLimitFileNum { - file_num, + num_files, budget_bytes, } => { // We cannot compact this partition because its first set of overlapped files @@ -163,20 +163,26 @@ async fn compact_candidates_with_memory_budget( ?partition_id, ?table_id, compaction_type, - file_num, + num_files, budget_bytes, file_num_limit = compactor.config.max_num_compacting_files, memory_budget_bytes = compactor.config.memory_budget_bytes, "skipped; over limit of number of files" ); - let reason = format!( - "over limit of num_files. Needed num_files = {}, limit of num_iles = {}. Needed budget = {}, memory budget = {}", - file_num, compactor.config.max_num_compacting_files, budget_bytes, compactor.config.memory_budget_bytes - ); - record_skipped_compaction(partition_id, Arc::clone(&compactor), &reason).await; + record_skipped_compaction( + partition_id, + Arc::clone(&compactor), + "over limit of num_files", + num_files, + compactor.config.max_num_compacting_files, + budget_bytes, + compactor.config.memory_budget_bytes, + ) + .await; } FilterResult::OverBudget { budget_bytes: needed_bytes, + num_files, } => { if needed_bytes <= compactor.config.memory_budget_bytes { // Required budget is larger than the remaining budget but smaller than @@ -191,14 +197,20 @@ async fn compact_candidates_with_memory_budget( compaction_type, ?needed_bytes, memory_budget_bytes = compactor.config.memory_budget_bytes, + ?num_files, + limit_num_files = compactor.config.max_num_compacting_files, "skipped; over memory budget" ); - let reason = format!( - "over memory budget. Needed budget = {}, memory budget = {}", - needed_bytes, compactor.config.memory_budget_bytes - ); - record_skipped_compaction(partition_id, Arc::clone(&compactor), &reason) - .await; + record_skipped_compaction( + partition_id, + Arc::clone(&compactor), + "over memory budget", + num_files, + compactor.config.max_num_compacting_files, + needed_bytes, + compactor.config.memory_budget_bytes, + ) + .await; } } FilterResult::Proceed { @@ -252,11 +264,22 @@ async fn record_skipped_compaction( partition_id: PartitionId, compactor: Arc, reason: &str, + num_files: usize, + limit_num_files: usize, + estimated_bytes: u64, + limit_bytes: u64, ) { let mut repos = compactor.catalog.repositories().await; let record_skip = repos .partitions() - .record_skipped_compaction(partition_id, reason) + .record_skipped_compaction( + partition_id, + reason, + num_files, + limit_num_files, + estimated_bytes, + limit_bytes, + ) .await; if let Err(e) = record_skip { warn!(?partition_id, %e, "could not log skipped compaction"); @@ -818,10 +841,7 @@ pub mod tests { let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); assert_eq!(skipped_compactions.len(), 1); assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id); - assert_eq!( - skipped_compactions[0].reason, - "over memory budget. Needed budget = 15100, memory budget = 14350" - ); + assert_eq!(skipped_compactions[0].reason, "over memory budget"); } } diff --git a/compactor/src/parquet_file_filtering.rs b/compactor/src/parquet_file_filtering.rs index 612c96474c..44f296d5aa 100644 --- a/compactor/src/parquet_file_filtering.rs +++ b/compactor/src/parquet_file_filtering.rs @@ -20,11 +20,12 @@ pub(crate) struct FilteredFiles { pub(crate) enum FilterResult { NothingToCompact, OverLimitFileNum { - file_num: usize, + num_files: usize, budget_bytes: u64, }, OverBudget { budget_bytes: u64, + num_files: usize, }, Proceed { files: Vec, @@ -150,7 +151,7 @@ fn filter_parquet_files_inner( // Cannot compact this partition because its first set of overlapped files // exceed the file limit return FilterResult::OverLimitFileNum { - file_num: 1 + overlaps.len(), + num_files: 1 + overlaps.len(), budget_bytes: estimated_file_bytes, }; } else { @@ -163,6 +164,7 @@ fn filter_parquet_files_inner( // Cannot compact this partition further with the given budget return FilterResult::OverBudget { budget_bytes: estimated_file_bytes, + num_files: 1 + overlaps.len(), }; } else { // Only compact the ones under the given budget @@ -458,7 +460,10 @@ mod tests { assert_eq!( filter_result, - FilterResult::OverBudget { budget_bytes: 1176 } + FilterResult::OverBudget { + budget_bytes: 1176, + num_files: 1 + } ); } @@ -479,7 +484,10 @@ mod tests { assert_eq!( filter_result, - FilterResult::OverBudget { budget_bytes: 1176 } + FilterResult::OverBudget { + budget_bytes: 1176, + num_files: 1 + } ); } @@ -501,7 +509,7 @@ mod tests { assert_eq!( filter_result, FilterResult::OverLimitFileNum { - file_num: 1, + num_files: 1, budget_bytes: 1176 } ); @@ -536,7 +544,7 @@ mod tests { assert_eq!( filter_result, FilterResult::OverLimitFileNum { - file_num: 2, + num_files: 2, budget_bytes: 2 * 1176 } ); diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 98ee0ce9f3..f777502558 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -920,6 +920,14 @@ pub struct SkippedCompaction { pub reason: String, /// when compaction was skipped pub skipped_at: Timestamp, + /// estimated memory budget + pub estimated_bytes: i64, + /// limit on memory budget + pub limit_bytes: i64, + /// num files selected to compact + pub num_files: i64, + /// limit on num files + pub limit_num_files: i64, } /// Data object for a tombstone. diff --git a/iox_catalog/migrations/20220926182450_add_cols_to_skipped_compactions.sql b/iox_catalog/migrations/20220926182450_add_cols_to_skipped_compactions.sql new file mode 100644 index 0000000000..e35e0cd45f --- /dev/null +++ b/iox_catalog/migrations/20220926182450_add_cols_to_skipped_compactions.sql @@ -0,0 +1,5 @@ +ALTER TABLE IF EXISTS skipped_compactions + ADD COLUMN IF NOT EXISTS num_files BIGINT DEFAULT NULL, + ADD COLUMN IF NOT EXISTS limit_num_files BIGINT DEFAULT NULL, + ADD COLUMN IF NOT EXISTS estimated_bytes BIGINT DEFAULT NULL, + ADD COLUMN IF NOT EXISTS limit_bytes BIGINT DEFAULT NULL; \ No newline at end of file diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 28a48b8c4d..977b90df98 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -476,6 +476,10 @@ pub trait PartitionRepo: Send + Sync { &mut self, partition_id: PartitionId, reason: &str, + num_files: usize, + limit_num_files: usize, + estimated_bytes: u64, + limit_bytes: u64, ) -> Result<()>; /// List the records of compacting a partition being skipped. This is mostly useful for testing. @@ -1590,26 +1594,34 @@ pub(crate) mod test_helpers { ); repos .partitions() - .record_skipped_compaction(other_partition.id, "I am le tired") + .record_skipped_compaction(other_partition.id, "I am le tired", 1, 2, 10, 20) .await .unwrap(); let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); assert_eq!(skipped_compactions.len(), 1); assert_eq!(skipped_compactions[0].partition_id, other_partition.id); assert_eq!(skipped_compactions[0].reason, "I am le tired"); + assert_eq!(skipped_compactions[0].num_files, 1); + assert_eq!(skipped_compactions[0].limit_num_files, 2); + assert_eq!(skipped_compactions[0].estimated_bytes, 10); + assert_eq!(skipped_compactions[0].limit_bytes, 20); // Only save the last reason that any particular partition was skipped (really if the // partition appears in the skipped compactions, it shouldn't become a compaction candidate // again, but race conditions and all that) repos .partitions() - .record_skipped_compaction(other_partition.id, "I'm on fire") + .record_skipped_compaction(other_partition.id, "I'm on fire", 11, 12, 110, 120) .await .unwrap(); let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); assert_eq!(skipped_compactions.len(), 1); assert_eq!(skipped_compactions[0].partition_id, other_partition.id); assert_eq!(skipped_compactions[0].reason, "I'm on fire"); + assert_eq!(skipped_compactions[0].num_files, 11); + assert_eq!(skipped_compactions[0].limit_num_files, 12); + assert_eq!(skipped_compactions[0].estimated_bytes, 110); + assert_eq!(skipped_compactions[0].limit_bytes, 120); // Test setting and reading the per-partition persistence numbers let partition = repos @@ -3030,7 +3042,14 @@ pub(crate) mod test_helpers { // The compactor skipped compacting another_partition repos .partitions() - .record_skipped_compaction(another_partition.id, "Not feeling up to it today") + .record_skipped_compaction( + another_partition.id, + "Not feeling up to it today", + 1, + 2, + 10, + 20, + ) .await .unwrap(); @@ -3360,7 +3379,7 @@ pub(crate) mod test_helpers { // The compactor skipped compacting another_partition repos .partitions() - .record_skipped_compaction(another_partition.id, "Secret reasons") + .record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 10, 20) .await .unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index e2febcc873..03753c8c8a 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -852,6 +852,10 @@ impl PartitionRepo for MemTxn { &mut self, partition_id: PartitionId, reason: &str, + num_files: usize, + limit_num_files: usize, + estimated_bytes: u64, + limit_bytes: u64, ) -> Result<()> { let reason = reason.to_string(); let skipped_at = Timestamp::from(self.time_provider.now()); @@ -865,11 +869,19 @@ impl PartitionRepo for MemTxn { Some(s) => { s.reason = reason; s.skipped_at = skipped_at; + s.num_files = num_files as i64; + s.limit_num_files = limit_num_files as i64; + s.estimated_bytes = estimated_bytes as i64; + s.limit_bytes = limit_bytes as i64; } None => stage.skipped_compactions.push(SkippedCompaction { partition_id, reason, skipped_at, + num_files: num_files as i64, + limit_num_files: limit_num_files as i64, + estimated_bytes: estimated_bytes as i64, + limit_bytes: limit_bytes as i64, }), } Ok(()) diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 2933ba8250..9f80ad0fe4 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -247,7 +247,7 @@ decorate!( "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "partition_partition_info_by_id" = partition_info_by_id(&mut self, partition_id: PartitionId) -> Result>; "partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result; - "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str) -> Result<()>; + "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize,estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result>; "partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>; ] diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 0033a81e0a..94f3725f19 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1312,22 +1312,34 @@ RETURNING *; &mut self, partition_id: PartitionId, reason: &str, + num_files: usize, + limit_num_files: usize, + estimated_bytes: u64, + limit_bytes: u64, ) -> Result<()> { sqlx::query( r#" INSERT INTO skipped_compactions - ( partition_id, reason, skipped_at ) + ( partition_id, reason, num_files, limit_num_files, estimated_bytes, limit_bytes, skipped_at ) VALUES - ( $1, $2, extract(epoch from NOW()) ) + ( $1, $2, $3, $4, $5, $6, extract(epoch from NOW()) ) ON CONFLICT ( partition_id ) DO UPDATE SET reason = EXCLUDED.reason, +num_files = EXCLUDED.num_files, +limit_num_files = EXCLUDED.limit_num_files, +estimated_bytes = EXCLUDED.estimated_bytes, +limit_bytes = EXCLUDED.limit_bytes, skipped_at = EXCLUDED.skipped_at; "#, ) - .bind(partition_id) + .bind(partition_id) // $1 .bind(reason) + .bind(num_files as i64) + .bind(limit_num_files as i64) + .bind(estimated_bytes as i64) + .bind(limit_bytes as i64) .execute(&mut self.inner) .await .context(interface::CouldNotRecordSkippedCompactionSnafu { partition_id })?;