diff --git a/compactor/src/compact_hot_partitions.rs b/compactor/src/compact_hot_partitions.rs index ad610cbd0c..6f7c06a82f 100644 --- a/compactor/src/compact_hot_partitions.rs +++ b/compactor/src/compact_hot_partitions.rs @@ -307,10 +307,101 @@ mod tests { use backoff::BackoffConfig; use data_types::{ColumnType, ColumnTypeCount, CompactionLevel}; use iox_query::exec::Executor; - use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable}; + use iox_tests::util::{ + TestCatalog, TestNamespace, TestParquetFileBuilder, TestShard, TestTable, + }; use iox_time::SystemProvider; use parquet_file::storage::ParquetStorage; - use std::{collections::VecDeque, sync::Arc, time::Duration}; + use std::{ + collections::VecDeque, + pin::Pin, + sync::{Arc, Mutex}, + time::Duration, + }; + + #[tokio::test] + async fn empty_candidates_compacts_nothing() { + test_helpers::maybe_start_logging(); + + let TestSetup { + compactor, + mock_compactor, + .. + } = test_setup().await; + + let sorted_candidates = VecDeque::new(); + let table_columns = HashMap::new(); + + compact_hot_partition_candidates( + Arc::clone(&compactor), + mock_compactor.compaction_function(), + sorted_candidates, + table_columns, + ) + .await; + + let compaction_groups = mock_compactor.results(); + assert!(compaction_groups.is_empty()); + } + + #[tokio::test] + async fn compacting_table_without_columns_does_nothing() { + test_helpers::maybe_start_logging(); + + let TestSetup { + compactor, + mock_compactor, + namespace, + shard, + .. + } = test_setup().await; + + let table_without_columns = namespace.create_table("test_table").await; + + let partition1 = table_without_columns + .with_shard(&shard) + .create_partition("one") + .await; + + let hot_time_one_hour_ago = + (compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(); + + let pf1_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition1.create_parquet_file_catalog_record(pf1_1).await; + + let candidates = compactor + .hot_partitions_to_compact( + compactor.config.max_number_partitions_per_shard(), + compactor + .config + .min_number_recent_ingested_files_per_partition(), + ) + .await + .unwrap(); + assert_eq!(candidates.len(), 1); + + let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); + let mut sorted_candidates = candidates.into_iter().collect::>(); + sorted_candidates.sort_by_key(|c| c.candidate.partition_id); + let sorted_candidates = sorted_candidates.into_iter().collect::>(); + let table_columns = HashMap::new(); + + compact_hot_partition_candidates( + Arc::clone(&compactor), + mock_compactor.compaction_function(), + sorted_candidates, + table_columns, + ) + .await; + + let compaction_groups = mock_compactor.results(); + assert!(compaction_groups.is_empty()); + } #[tokio::test] async fn test_compact_hot_partition_candidates() { @@ -318,6 +409,7 @@ mod tests { let TestSetup { compactor, + mock_compactor, shard, table, .. @@ -496,34 +588,18 @@ mod tests { // P4 is not compacted due to overbudget // Debug info shows all 3 rounds - let compaction_groups = Arc::new(std::sync::Mutex::new(vec![])); - - let compaction_groups_for_closure = Arc::clone(&compaction_groups); - - let mock_compaction = - move |_compactor: Arc, - parallel_compacting_candidates: Vec| { - let compaction_groups_for_async = Arc::clone(&compaction_groups_for_closure); - async move { - compaction_groups_for_async - .lock() - .unwrap() - .push(parallel_compacting_candidates); - } - }; - // Todo next: So conveniently, debug log shows this is also a reproducer of // https://github.com/influxdata/conductor/issues/1130 // "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema" compact_hot_partition_candidates( Arc::clone(&compactor), - mock_compaction, + mock_compactor.compaction_function(), sorted_candidates, table_columns, ) .await; - let compaction_groups = compaction_groups.lock().unwrap(); + let compaction_groups = mock_compactor.results(); // 3 rounds of parallel compaction assert_eq!(compaction_groups.len(), 3); @@ -576,6 +652,47 @@ mod tests { assert_eq!(g3_candidate1_pf_ids, vec![6, 5]); } + #[derive(Default)] + struct MockCompactor { + compaction_groups: Arc>>>, + } + + type CompactorFunctionFactory = Box< + dyn Fn( + Arc, + Vec, + ) -> Pin + Send>> + + Send + + Sync + + 'static, + >; + + impl MockCompactor { + fn compaction_function(&self) -> CompactorFunctionFactory { + let compaction_groups_for_closure = Arc::clone(&self.compaction_groups); + Box::new( + move |_compactor: Arc, + parallel_compacting_candidates: Vec| { + let compaction_groups_for_async = Arc::clone(&compaction_groups_for_closure); + Box::pin(async move { + compaction_groups_for_async + .lock() + .unwrap() + .push(parallel_compacting_candidates); + }) + }, + ) + } + + fn results(self) -> Vec> { + let Self { compaction_groups } = self; + Arc::try_unwrap(compaction_groups) + .unwrap() + .into_inner() + .unwrap() + } + } + fn make_compactor_config() -> CompactorConfig { let max_desired_file_size_bytes = 100_000_000; let percentage_max_file_size = 90; @@ -603,6 +720,8 @@ mod tests { struct TestSetup { compactor: Arc, + mock_compactor: MockCompactor, + namespace: Arc, shard: Arc, table: Arc, } @@ -642,8 +761,12 @@ mod tests { Arc::new(metric::Registry::new()), )); + let mock_compactor = MockCompactor::default(); + TestSetup { compactor, + mock_compactor, + namespace, shard, table, }