diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index d00283ab38..bdc7c87ac9 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -69,13 +69,14 @@ pub struct CompactorConfig { )] pub split_percentage: u16, - /// The compactor will limit the number of simultaneous compaction jobs based on the - /// size of the input files to be compacted. This number should be less than 1/10th - /// of the available memory to ensure compactions have - /// enough space to run. - /// Default is 1,073,741,824 bytes (1GB). - /// The number of compact_partititons run in parallel is determined by: - /// max_concurrent_size_bytes/input_size_threshold_bytes + /// The compactor will limit the number of simultaneous hot partition compaction jobs based on + /// the size of the input files to be compacted. This number should be less than 1/10th of the + /// available memory to ensure compactions have enough space to run. + /// + /// Default is 1024 * 1024 * 1024 = 1,073,741,824 bytes (1GB). + // + // The number of compact_hot_partititons run in parallel is determined by: + // max_concurrent_size_bytes/input_size_threshold_bytes #[clap( long = "--compaction-concurrent-size-bytes", env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES", @@ -84,6 +85,22 @@ pub struct CompactorConfig { )] pub max_concurrent_size_bytes: u64, + /// The compactor will limit the number of simultaneous cold partition compaction jobs based on + /// the size of the input files to be compacted. This number should be less than 1/10th of the + /// available memory to ensure compactions have enough space to run. + /// + /// Default is 1024 * 1024 * 900 = 943,718,400 bytes (900MB). + // + // The number of compact_cold_partititons run in parallel is determined by: + // max_cold_concurrent_size_bytes/cold_input_size_threshold_bytes + #[clap( + long = "--compaction-cold-concurrent-size-bytes", + env = "INFLUXDB_IOX_COMPACTION_COLD_CONCURRENT_SIZE_BYTES", + default_value = "943718400", + action + )] + pub max_cold_concurrent_size_bytes: u64, + /// Max number of partitions per sequencer we want to compact per cycle /// Default: 1 #[clap( @@ -104,14 +121,14 @@ pub struct CompactorConfig { )] pub min_number_recent_ingested_files_per_partition: usize, - /// A compaction operation will gather as many L0 files with their overlapping L1 files to - /// compact together until the total size of input files crosses this threshold. Later - /// compactions will pick up the remaining L0 files. + /// A compaction operation for hot partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. /// /// A compaction operation will be limited by this or by the file count threshold, whichever is /// hit first. /// - /// Default is 1024 * 1024 * 100 = 100,048,576 (100MB). + /// Default is 1024 * 1024 * 100 = 100,048,576 bytes (100MB). #[clap( long = "--compaction-input-size-threshold-bytes", env = "INFLUXDB_IOX_COMPACTION_INPUT_SIZE_THRESHOLD_BYTES", @@ -120,6 +137,19 @@ pub struct CompactorConfig { )] pub input_size_threshold_bytes: u64, + /// A compaction operation for cold partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. + /// + /// Default is 1024 * 1024 * 600 = 629,145,600 bytes (600MB). + #[clap( + long = "--compaction-cold-input-size-threshold-bytes", + env = "INFLUXDB_IOX_COMPACTION_COLD_INPUT_SIZE_THRESHOLD_BYTES", + default_value = "629145600", + action + )] + pub cold_input_size_threshold_bytes: u64, + /// A compaction operation will gather as many L0 files with their overlapping L1 files to /// compact together until the total number of L0 + L1 files crosses this threshold. Later /// compactions will pick up the remaining L0 files. @@ -135,4 +165,17 @@ pub struct CompactorConfig { action )] pub input_file_count_threshold: usize, + + /// The multiple of times that compacting hot partitions should run for every one time that + /// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions + /// equally. + /// + /// Default is 4. + #[clap( + long = "--compaction-hot-multiple", + env = "INFLUXDB_IOX_COMPACTION_HOT_MULTIPLE", + default_value = "4", + action + )] + pub hot_multiple: usize, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 0455de866f..8e929010d1 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -238,7 +238,7 @@ impl Compactor { /// * In all cases above, for each sequencer, N partitions with the most new ingested files /// will be selected and the return list will include at most, P = N * S, partitions where S /// is the number of sequencers this compactor handles. - pub async fn partitions_to_compact( + pub async fn hot_partitions_to_compact( &self, // Max number of the most recent highest ingested throughput partitions // per sequencer we want to read @@ -252,8 +252,10 @@ impl Compactor { let mut repos = self.catalog.repositories().await; for sequencer_id in &self.sequencers { - let attributes = - Attributes::from([("sequencer_id", format!("{}", *sequencer_id).into())]); + let attributes = Attributes::from([ + ("sequencer_id", format!("{}", *sequencer_id).into()), + ("partition_type", "hot".into()), + ]); // Get the most recent highest ingested throughput partitions within // the last 4 hours. If nothing, increase to 24 hours @@ -285,35 +287,57 @@ impl Compactor { } } - // No active ingesting partitions the last 24 hours, - // get partition with the most level-0 files - if num_partitions == 0 { - debug!( - sequencer_id = sequencer_id.get(), - "no active ingesting partitions", - ); + // Record metric for candidates per sequencer + debug!( + sequencer_id = sequencer_id.get(), + n = num_partitions, + "hot compaction candidates", + ); + let number_gauge = self.compaction_candidate_gauge.recorder(attributes); + number_gauge.set(num_partitions as u64); + } - let mut partitions = repos - .parquet_files() - .most_level_0_files_partitions(*sequencer_id, max_num_partitions_per_sequencer) - .await - .context(MostL0PartitionsSnafu { - sequencer_id: *sequencer_id, - })?; + Ok(candidates) + } - if !partitions.is_empty() { - num_partitions = partitions.len(); - candidates.append(&mut partitions); - } - } + /// Return a list of partitions that: + /// + /// - Have not received any writes in 24 hours (determined by all parquet files having a + /// created_at time older than 24 hours ago) + /// - Have some level 0 parquet files that need to be upgraded or compacted + pub async fn cold_partitions_to_compact( + &self, + // Max number of cold partitions per sequencer we want to compact + max_num_partitions_per_sequencer: usize, + ) -> Result> { + let mut candidates = + Vec::with_capacity(self.sequencers.len() * max_num_partitions_per_sequencer); + let mut repos = self.catalog.repositories().await; + + for sequencer_id in &self.sequencers { + let attributes = Attributes::from([ + ("sequencer_id", format!("{}", *sequencer_id).into()), + ("partition_type", "cold".into()), + ]); + + let mut partitions = repos + .parquet_files() + .most_level_0_files_partitions(*sequencer_id, 24, max_num_partitions_per_sequencer) + .await + .context(MostL0PartitionsSnafu { + sequencer_id: *sequencer_id, + })?; + + let num_partitions = partitions.len(); + candidates.append(&mut partitions); // Record metric for candidates per sequencer debug!( sequencer_id = sequencer_id.get(), n = num_partitions, - "compaction candidates", + "cold compaction candidates", ); - let number_gauge = self.compaction_candidate_gauge.recorder(attributes.clone()); + let number_gauge = self.compaction_candidate_gauge.recorder(attributes); number_gauge.set(num_partitions as u64); } @@ -454,7 +478,7 @@ mod tests { use uuid::Uuid; #[tokio::test] - async fn test_candidate_partitions() { + async fn test_hot_partitions_to_compact() { let catalog = TestCatalog::new(); // Create a db with 2 sequencers, one with 4 empty partitions and the other one with one @@ -465,7 +489,12 @@ mod tests { let pool = txn.query_pools().create_or_get("foo").await.unwrap(); let namespace = txn .namespaces() - .create("namespace_candidate_partitions", "inf", kafka.id, pool.id) + .create( + "namespace_hot_partitions_to_compact", + "inf", + kafka.id, + pool.id, + ) .await .unwrap(); let table = txn @@ -542,9 +571,6 @@ mod tests { // Some times in the past to set to created_at of the files let time_now = Timestamp::new(compactor.time_provider.now().timestamp_nanos()); - let _time_one_hour_ago = Timestamp::new( - (compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(), - ); let time_three_hour_ago = Timestamp::new( (compactor.time_provider.now() - Duration::from_secs(60 * 60 * 3)).timestamp_nanos(), ); @@ -573,7 +599,7 @@ mod tests { }; // Note: The order of the test cases below is important and should not be changed - // becasue they depend on the order of the writes and their content. For example, + // because they depend on the order of the writes and their content. For example, // in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`, // but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3. // In order words, the last Case needs all content of previous tests. @@ -582,7 +608,7 @@ mod tests { // -------------------------------------- // Case 1: no files yet --> no partition candidates // - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -606,13 +632,14 @@ mod tests { .unwrap(); txn.commit().await.unwrap(); // No non-deleted level 0 files yet --> no candidates - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- - // Case 3: no new recent writes (within the last 24 hours) --> return candidates with the most L0 + // Case 3: no new recent writes (within the last 24 hours) --> no partition candidates + // (the cold case will pick them up) // - // partition2 has an old (more 24 hours ago) non-deleted level 0 file + // partition2 has an old (more than 24 hours ago) non-deleted level 0 file let mut txn = catalog.catalog.start_transaction().await.unwrap(); let p3 = ParquetFileParams { object_store_id: Uuid::new_v4(), @@ -622,11 +649,10 @@ mod tests { }; let _pf3 = txn.parquet_files().create(p3).await.unwrap(); txn.commit().await.unwrap(); - // - // Has at least one partition with a L0 file --> make it a candidate - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); - assert_eq!(candidates.len(), 1); - assert_eq!(candidates[0].partition_id, partition2.id); + + // No hot candidates + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); + assert!(candidates.is_empty()); // -------------------------------------- // Case 4: has one partition with recent writes (5 hours ago) --> return that partition @@ -643,7 +669,7 @@ mod tests { txn.commit().await.unwrap(); // // Has at least one partition with a recent write --> make it a candidate - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].partition_id, partition4.id); @@ -665,28 +691,28 @@ mod tests { txn.commit().await.unwrap(); // // make partitions in the most recent group candidates - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].partition_id, partition3.id); // -------------------------------------- - // Case 6: has partittion candidates for 2 sequecers + // Case 6: has partition candidates for 2 sequencers // - // The another_sequencer now has non-deleted level-0 file ingested 38 hours ago + // The another_sequencer now has non-deleted level-0 file ingested 5 hours ago let mut txn = catalog.catalog.start_transaction().await.unwrap(); let p6 = ParquetFileParams { object_store_id: Uuid::new_v4(), sequencer_id: another_sequencer.id, table_id: another_table.id, partition_id: another_partition.id, - created_at: time_38_hour_ago, + created_at: time_five_hour_ago, ..p1.clone() }; let _pf6 = txn.parquet_files().create(p6).await.unwrap(); txn.commit().await.unwrap(); // // Will have 2 candidates, one for each sequencer - let mut candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); candidates.sort(); assert_eq!(candidates.len(), 2); assert_eq!(candidates[0].partition_id, partition3.id); @@ -723,19 +749,307 @@ mod tests { let percentage_max_file_size = 30; let split_percentage = 80; let max_concurrent_size_bytes = 100_000; + let max_cold_concurrent_size_bytes = 90_000; let max_number_partitions_per_sequencer = 1; let min_number_recent_ingested_per_partition = 1; let input_size_threshold_bytes = 300 * 1024 * 1024; + let cold_input_size_threshold_bytes = 600 * 1024 * 1024; let input_file_count_threshold = 100; + let hot_multiple = 4; CompactorConfig::new( max_desired_file_size_bytes, percentage_max_file_size, split_percentage, max_concurrent_size_bytes, + max_cold_concurrent_size_bytes, max_number_partitions_per_sequencer, min_number_recent_ingested_per_partition, input_size_threshold_bytes, + cold_input_size_threshold_bytes, input_file_count_threshold, + hot_multiple, ) } + + #[tokio::test] + async fn test_cold_partitions_to_compact() { + let catalog = TestCatalog::new(); + + // Create a db with 2 sequencers, one with 4 empty partitions and the other one with one + // empty partition + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + + let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); + let pool = txn.query_pools().create_or_get("foo").await.unwrap(); + let namespace = txn + .namespaces() + .create( + "namespace_hot_partitions_to_compact", + "inf", + kafka.id, + pool.id, + ) + .await + .unwrap(); + let table = txn + .tables() + .create_or_get("test_table", namespace.id) + .await + .unwrap(); + let sequencer = txn + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(1)) + .await + .unwrap(); + let partition1 = txn + .partitions() + .create_or_get("one".into(), sequencer.id, table.id) + .await + .unwrap(); + let partition2 = txn + .partitions() + .create_or_get("two".into(), sequencer.id, table.id) + .await + .unwrap(); + let partition3 = txn + .partitions() + .create_or_get("three".into(), sequencer.id, table.id) + .await + .unwrap(); + let partition4 = txn + .partitions() + .create_or_get("four".into(), sequencer.id, table.id) + .await + .unwrap(); + let partition5 = txn + .partitions() + .create_or_get("five".into(), sequencer.id, table.id) + .await + .unwrap(); + // other sequencer + let another_table = txn + .tables() + .create_or_get("another_test_table", namespace.id) + .await + .unwrap(); + let another_sequencer = txn + .sequencers() + .create_or_get(&kafka, KafkaPartition::new(2)) + .await + .unwrap(); + let another_partition = txn + .partitions() + .create_or_get( + "another_partition".into(), + another_sequencer.id, + another_table.id, + ) + .await + .unwrap(); + // update sort key for this another_partition + let another_partition = txn + .partitions() + .update_sort_key(another_partition.id, &["tag1", "time"]) + .await + .unwrap(); + txn.commit().await.unwrap(); + + // Create a compactor + let time_provider = Arc::new(SystemProvider::new()); + let config = make_compactor_config(); + let compactor = Compactor::new( + vec![sequencer.id, another_sequencer.id], + Arc::clone(&catalog.catalog), + ParquetStorage::new(Arc::clone(&catalog.object_store)), + Arc::new(Executor::new(1)), + time_provider, + BackoffConfig::default(), + config, + Arc::new(metric::Registry::new()), + ); + + // Some times in the past to set to created_at of the files + let time_five_hour_ago = Timestamp::new( + (compactor.time_provider.now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(), + ); + let time_38_hour_ago = Timestamp::new( + (compactor.time_provider.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(), + ); + + // Basic parquet info + let p1 = ParquetFileParams { + sequencer_id: sequencer.id, + namespace_id: namespace.id, + table_id: table.id, + partition_id: partition1.id, + object_store_id: Uuid::new_v4(), + max_sequence_number: SequenceNumber::new(100), + min_time: Timestamp::new(1), + max_time: Timestamp::new(5), + file_size_bytes: 1337, + row_count: 0, + compaction_level: CompactionLevel::Initial, // level of file of new writes + created_at: time_38_hour_ago, // create cold files by default + column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), + }; + + // Note: The order of the test cases below is important and should not be changed + // because they depend on the order of the writes and their content. For example, + // in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`, + // but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3. + // In order words, the last Case needs all content of previous tests. + // This shows the priority of selecting compaction candidates + + // -------------------------------------- + // Case 1: no files yet --> no partition candidates + // + let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + assert!(candidates.is_empty()); + + // -------------------------------------- + // Case 2: no non-deleleted cold L0 files --> no partition candidates + // + // partition1 has a cold deleted L0 + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap(); + txn.parquet_files().flag_for_delete(pf1.id).await.unwrap(); + // + // partition2 has a cold non-L0 file + let p2 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition2.id, + ..p1.clone() + }; + let pf2 = txn.parquet_files().create(p2).await.unwrap(); + txn.parquet_files() + .update_to_level_1(&[pf2.id]) + .await + .unwrap(); + txn.commit().await.unwrap(); + // No non-deleted level 0 files yet --> no candidates + let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + assert!(candidates.is_empty()); + + // -------------------------------------- + // Case 3: no new recent writes (within the last 24 hours) --> return that partition + // + // partition2 has a cold (more than 24 hours ago) non-deleted level 0 file + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let p3 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition2.id, + ..p1.clone() + }; + let _pf3 = txn.parquet_files().create(p3).await.unwrap(); + txn.commit().await.unwrap(); + // + // Has at least one partition with a L0 file --> make it a candidate + let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + assert_eq!(candidates.len(), 1); + assert_eq!(candidates[0].partition_id, partition2.id); + + // -------------------------------------- + // Case 4: has two cold partitions --> return the candidate with the most L0 + // + // partition4 has two cold non-deleted level 0 files + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let p4 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition4.id, + ..p1.clone() + }; + let _pf4 = txn.parquet_files().create(p4).await.unwrap(); + let p5 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition4.id, + ..p1.clone() + }; + let _pf5 = txn.parquet_files().create(p5).await.unwrap(); + txn.commit().await.unwrap(); + // Partition with the most l0 files is the candidate + let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + assert_eq!(candidates.len(), 1); + assert_eq!(candidates[0].partition_id, partition4.id); + + // -------------------------------------- + // Case 5: "warm" and "hot" partitions aren't returned + // + // partition3 has one cold level 0 file and one hot level 0 file + // partition5 has one hot level 0 file + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let p3_cold = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition3.id, + ..p1.clone() + }; + let _pf3_cold = txn.parquet_files().create(p3_cold).await.unwrap(); + let p3_hot = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition3.id, + created_at: time_five_hour_ago, + ..p1.clone() + }; + let _pf3_hot = txn.parquet_files().create(p3_hot).await.unwrap(); + let p5_hot = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition5.id, + created_at: time_five_hour_ago, + ..p1.clone() + }; + let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap(); + txn.commit().await.unwrap(); + // Partition4 is still the only candidate + let candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + assert_eq!(candidates.len(), 1); + assert_eq!(candidates[0].partition_id, partition4.id); + + // Ask for 2 partitions per sequencer; get partition4 and partition2 + let candidates = compactor.cold_partitions_to_compact(2).await.unwrap(); + assert_eq!(candidates.len(), 2); + assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[1].partition_id, partition2.id); + + // Ask for 3 partitions per sequencer; still get only partition4 and partition2 + let candidates = compactor.cold_partitions_to_compact(3).await.unwrap(); + assert_eq!(candidates.len(), 2); + assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[1].partition_id, partition2.id); + + // -------------------------------------- + // Case 6: has partition candidates for 2 sequencers + // + // The another_sequencer now has non-deleted level-0 file ingested 38 hours ago + let mut txn = catalog.catalog.start_transaction().await.unwrap(); + let p6 = ParquetFileParams { + object_store_id: Uuid::new_v4(), + sequencer_id: another_sequencer.id, + table_id: another_table.id, + partition_id: another_partition.id, + created_at: time_38_hour_ago, + ..p1.clone() + }; + let _pf6 = txn.parquet_files().create(p6).await.unwrap(); + txn.commit().await.unwrap(); + + // Will have 2 candidates, one for each sequencer + let mut candidates = compactor.cold_partitions_to_compact(1).await.unwrap(); + candidates.sort(); + assert_eq!(candidates.len(), 2); + assert_eq!(candidates[0].partition_id, partition4.id); + assert_eq!(candidates[0].sequencer_id, sequencer.id); + assert_eq!(candidates[1].partition_id, another_partition.id); + assert_eq!(candidates[1].sequencer_id, another_sequencer.id); + + // Ask for 2 candidates per sequencer; get back 3: 2 from sequencer and 1 from + // another_sequencer + let mut candidates = compactor.cold_partitions_to_compact(2).await.unwrap(); + candidates.sort(); + assert_eq!(candidates.len(), 3); + assert_eq!(candidates[0].partition_id, partition2.id); + assert_eq!(candidates[0].sequencer_id, sequencer.id); + assert_eq!(candidates[1].partition_id, partition4.id); + assert_eq!(candidates[1].sequencer_id, sequencer.id); + assert_eq!(candidates[2].partition_id, another_partition.id); + assert_eq!(candidates[2].sequencer_id, another_sequencer.id); + } } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 8f7f34bb78..c638117c4a 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -126,11 +126,15 @@ pub struct CompactorConfig { /// This value must be between (0, 100) split_percentage: u16, - /// The compactor will limit the number of simultaneous compaction jobs based on the - /// size of the input files to be compacted. This number should be less than 1/10th - /// of the available memory to ensure compactions have - /// enough space to run. - max_concurrent_compaction_size_bytes: u64, + /// The compactor will limit the number of simultaneous hot partition compaction jobs based on + /// the size of the input files to be compacted. This number should be less than 1/10th of the + /// available memory to ensure compactions have enough space to run. + max_concurrent_size_bytes: u64, + + /// The compactor will limit the number of simultaneous cold partition compaction jobs based on + /// the size of the input files to be compacted. This number should be less than 1/10th of the + /// available memory to ensure compactions have enough space to run. + max_cold_concurrent_size_bytes: u64, /// Max number of partitions per sequencer we want to compact per cycle max_number_partitions_per_sequencer: usize, @@ -138,14 +142,19 @@ pub struct CompactorConfig { /// Min number of recent ingested files a partition needs to be considered for compacting min_number_recent_ingested_files_per_partition: usize, - /// A compaction operation will gather as many L0 files with their overlapping L1 files to - /// compact together until the total size of input files crosses this threshold. Later - /// compactions will pick up the remaining L0 files. + /// A compaction operation for hot partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. /// /// A compaction operation will be limited by this or by the file count threshold, whichever is /// hit first. input_size_threshold_bytes: u64, + /// A compaction operation for cold partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. + cold_input_size_threshold_bytes: u64, + /// A compaction operation will gather as many L0 files with their overlapping L1 files to /// compact together until the total number of L0 + L1 files crosses this threshold. Later /// compactions will pick up the remaining L0 files. @@ -153,6 +162,11 @@ pub struct CompactorConfig { /// A compaction operation will be limited by this or by the input size threshold, whichever is /// hit first. input_file_count_threshold: usize, + + /// The multiple of times that compacting hot partitions should run for every one time that + /// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions + /// equally. + hot_multiple: usize, } impl CompactorConfig { @@ -162,11 +176,14 @@ impl CompactorConfig { max_desired_file_size_bytes: u64, percentage_max_file_size: u16, split_percentage: u16, - max_concurrent_compaction_size_bytes: u64, + max_concurrent_size_bytes: u64, + max_cold_concurrent_size_bytes: u64, max_number_partitions_per_sequencer: usize, min_number_recent_ingested_files_per_partition: usize, input_size_threshold_bytes: u64, + cold_input_size_threshold_bytes: u64, input_file_count_threshold: usize, + hot_multiple: usize, ) -> Self { assert!(split_percentage > 0 && split_percentage <= 100); @@ -174,11 +191,14 @@ impl CompactorConfig { max_desired_file_size_bytes, percentage_max_file_size, split_percentage, - max_concurrent_compaction_size_bytes, + max_concurrent_size_bytes, + max_cold_concurrent_size_bytes, max_number_partitions_per_sequencer, min_number_recent_ingested_files_per_partition, input_size_threshold_bytes, + cold_input_size_threshold_bytes, input_file_count_threshold, + hot_multiple, } } @@ -202,8 +222,8 @@ impl CompactorConfig { /// level 0 files, but should later also consider the level 1 files to be compacted. This /// number should be less than 1/10th of the available memory to ensure compactions have /// enough space to run. - pub fn max_concurrent_compaction_size_bytes(&self) -> u64 { - self.max_concurrent_compaction_size_bytes + pub fn max_concurrent_size_bytes(&self) -> u64 { + self.max_concurrent_size_bytes } /// Max number of partitions per sequencer we want to compact per cycle @@ -216,9 +236,9 @@ impl CompactorConfig { self.min_number_recent_ingested_files_per_partition } - /// A compaction operation will gather as many L0 files with their overlapping L1 files to - /// compact together until the total size of input files crosses this threshold. Later - /// compactions will pick up the remaining L0 files. + /// A compaction operation for hot partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. /// /// A compaction operation will be limited by this or by the file count threshold, whichever is /// hit first. @@ -226,6 +246,13 @@ impl CompactorConfig { self.input_size_threshold_bytes } + /// A compaction operation for cold partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total size of input files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. + pub fn cold_input_size_threshold_bytes(&self) -> u64 { + self.cold_input_size_threshold_bytes + } + /// A compaction operation will gather as many L0 files with their overlapping L1 files to /// compact together until the total number of L0 + L1 files crosses this threshold. Later /// compactions will pick up the remaining L0 files. @@ -248,119 +275,235 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { while !shutdown.is_cancelled() { debug!("compactor main loop tick."); - // Select partition candidates - let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("partitions_to_compact", || async { - compactor - .partitions_to_compact( - compactor.config.max_number_partitions_per_sequencer(), - compactor - .config - .min_number_recent_ingested_files_per_partition(), - ) - .await - }) - .await - .expect("retry forever"); - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .candidate_selection_duration - .recorder(Attributes::from([])); - duration.record(delta); + for _ in 0..compactor.config.hot_multiple { + compact_hot_partitions(Arc::clone(&compactor)).await; } + compact_cold_partitions(Arc::clone(&compactor)).await; + } +} - // Add other compaction-needed info into selected partitions - let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("partitions_to_compact", || async { - compactor.add_info_to_partitions(&candidates).await +async fn compact_hot_partitions(compactor: Arc) { + // Select hot partition candidates + let hot_attributes = Attributes::from(&[("partition_type", "hot")]); + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("hot_partitions_to_compact", || async { + compactor + .hot_partitions_to_compact( + compactor.config.max_number_partitions_per_sequencer(), + compactor + .config + .min_number_recent_ingested_files_per_partition(), + ) + .await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .candidate_selection_duration + .recorder(hot_attributes.clone()); + duration.record(delta); + } + + // Add other compaction-needed info into selected partitions + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("add_info_to_partitions", || async { + compactor.add_info_to_partitions(&candidates).await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .partitions_extra_info_reading_duration + .recorder(hot_attributes.clone()); + duration.record(delta); + } + + let n_candidates = candidates.len(); + if n_candidates == 0 { + debug!("no hot compaction candidates found"); + // sleep for a second to avoid a hot busy loop when the catalog is polled + tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await; + return; + } else { + debug!(n_candidates, "found hot compaction candidates"); + } + + let start_time = compactor.time_provider.now(); + + // Repeat compacting n partitions in parallel until all candidates are compacted. + // Concurrency level calculation (this is estimated from previous experiments. The actual + // resource management will be more complicated and a future feature): + // . Each `compact partititon` takes max of this much memory input_size_threshold_bytes + // . We have this memory budget: max_concurrent_size_bytes + // --> num_parallel_partitions = max_concurrent_size_bytes/ + // input_size_threshold_bytes + let num_parallel_partitions = (compactor.config.max_concurrent_size_bytes + / compactor.config.input_size_threshold_bytes) as usize; + + futures::stream::iter(candidates) + .map(|p| { + // run compaction in its own task + let comp = Arc::clone(&compactor); + tokio::task::spawn(async move { + let partition_id = p.candidate.partition_id; + let compaction_result = crate::compact_hot_partition(&comp, p).await; + + match compaction_result { + Err(e) => { + warn!(?e, ?partition_id, "hot compaction failed"); + } + Ok(_) => { + debug!(?partition_id, "hot compaction complete"); + } + }; }) - .await - .expect("retry forever"); - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .partitions_extra_info_reading_duration - .recorder(Attributes::from([])); - duration.record(delta); - } + }) + // Assume we have enough resources to run + // num_parallel_partitions compactions in parallel + .buffer_unordered(num_parallel_partitions) + // report any JoinErrors (aka task panics) + .map(|join_result| { + if let Err(e) = join_result { + warn!(?e, "hot compaction task failed"); + } + Ok(()) + }) + // Errors are reported during execution, so ignore results here + // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each + .forward(futures::sink::drain()) + .await + .ok(); - let n_candidates = candidates.len(); - if n_candidates == 0 { - debug!("no compaction candidates found"); - // sleep for a second to avoid a hot busy loop when the - // catalog is polled - tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await; - continue; - } else { - debug!(n_candidates, "found compaction candidates"); - } + // Done compacting all candidates in the cycle, record its time + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor.compaction_cycle_duration.recorder(hot_attributes); + duration.record(delta); + } +} - let start_time = compactor.time_provider.now(); +async fn compact_cold_partitions(compactor: Arc) { + let cold_attributes = Attributes::from(&[("partition_type", "cold")]); + // Select cold partition candidates + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("cold_partitions_to_compact", || async { + compactor + .cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer()) + .await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .candidate_selection_duration + .recorder(cold_attributes.clone()); + duration.record(delta); + } - // Repeat compacting n partitions in parallel until all candidates are compacted. - // Concurrency level calculation (this is estimated from previous experiments. The actual resource - // management will be more complicated and a future feature): - // . Each `compact partititon` takes max of this much memory input_size_threshold_bytes - // . We have this memory budget: max_concurrent_compaction_size_bytes - // --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes - let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes - / compactor.config.input_size_threshold_bytes) - as usize; + // Add other compaction-needed info into selected partitions + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("add_info_to_partitions", || async { + compactor.add_info_to_partitions(&candidates).await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .partitions_extra_info_reading_duration + .recorder(cold_attributes.clone()); + duration.record(delta); + } - futures::stream::iter(candidates) - .map(|p| { - // run compaction in its own task - let comp = Arc::clone(&compactor); - tokio::task::spawn(async move { - let partition_id = p.candidate.partition_id; - let compaction_result = crate::compact_partition(&comp, p).await; + let n_candidates = candidates.len(); + if n_candidates == 0 { + debug!("no cold compaction candidates found"); + return; + } else { + debug!(n_candidates, "found cold compaction candidates"); + } - match compaction_result { - Err(e) => { - warn!(?e, ?partition_id, "compaction failed"); - } - Ok(_) => { - debug!(?partition_id, "compaction complete"); - } - }; - }) + let start_time = compactor.time_provider.now(); + + // Repeat compacting n cold partitions in parallel until all candidates are compacted. + // Concurrency level calculation (this is estimated from previous experiments. The actual + // resource management will be more complicated and a future feature): + // + // . Each `compact partititon` takes max of this much memory cold_input_size_threshold_bytes + // . We have this memory budget: max_cold_concurrent_size_bytes + // --> num_parallel_partitions = max_cold_concurrent_size_bytes/ + // cold_input_size_threshold_bytes + let num_parallel_partitions = (compactor.config.max_cold_concurrent_size_bytes + / compactor.config.cold_input_size_threshold_bytes) + as usize; + + futures::stream::iter(candidates) + .map(|p| { + // run compaction in its own task + let comp = Arc::clone(&compactor); + tokio::task::spawn(async move { + let partition_id = p.candidate.partition_id; + let compaction_result = crate::compact_cold_partition(&comp, p).await; + + match compaction_result { + Err(e) => { + warn!(?e, ?partition_id, "cold compaction failed"); + } + Ok(_) => { + debug!(?partition_id, "cold compaction complete"); + } + }; }) - // Assume we have enough resources to run - // num_parallel_partitions compactions in parallel - .buffer_unordered(num_parallel_partitions) - // report any JoinErrors (aka task panics) - .map(|join_result| { - if let Err(e) = join_result { - warn!(?e, "compaction task failed"); - } - Ok(()) - }) - // Errors are reported during execution, so ignore results here - // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each - .forward(futures::sink::drain()) - .await - .ok(); + }) + // Assume we have enough resources to run + // num_parallel_partitions compactions in parallel + .buffer_unordered(num_parallel_partitions) + // report any JoinErrors (aka task panics) + .map(|join_result| { + if let Err(e) = join_result { + warn!(?e, "cold compaction task failed"); + } + Ok(()) + }) + // Errors are reported during execution, so ignore results here + // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each + .forward(futures::sink::drain()) + .await + .ok(); - // Done compacting all candidates in the cycle, record its time - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .compaction_cycle_duration - .recorder(Attributes::from([])); - duration.record(delta); - } + // Done compacting all candidates in the cycle, record its time + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .compaction_cycle_duration + .recorder(cold_attributes); + duration.record(delta); } } diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 8e045881bd..8a3205881f 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -21,6 +21,7 @@ pub mod server; pub mod utils; use crate::compact::{Compactor, PartitionCompactionCandidateWithInfo}; +use data_types::CompactionLevel; use metric::Attributes; use snafu::{ResultExt, Snafu}; use std::sync::Arc; @@ -29,18 +30,23 @@ use std::sync::Arc; #[allow(missing_copy_implementations, missing_docs)] pub(crate) enum Error { #[snafu(display("{}", source))] - ParquetFileLookup { + Lookup { source: parquet_file_lookup::PartitionFilesFromPartitionError, }, #[snafu(display("{}", source))] - ParquetFileCombining { + Combining { source: parquet_file_combining::Error, }, + + #[snafu(display("{}", source))] + Upgrading { + source: iox_catalog::interface::Error, + }, } -/// One compaction operation of one partition -pub(crate) async fn compact_partition( +/// One compaction operation of one hot partition +pub(crate) async fn compact_hot_partition( compactor: &Compactor, partition: PartitionCompactionCandidateWithInfo, ) -> Result<(), Error> { @@ -53,9 +59,9 @@ pub(crate) async fn compact_partition( partition.id(), ) .await - .context(ParquetFileLookupSnafu)?; + .context(LookupSnafu)?; - let to_compact = parquet_file_filtering::filter_parquet_files( + let to_compact = parquet_file_filtering::filter_hot_parquet_files( parquet_files_for_compaction, compactor.config.input_size_threshold_bytes(), compactor.config.input_file_count_threshold(), @@ -76,9 +82,79 @@ pub(crate) async fn compact_partition( compactor.config.split_percentage(), ) .await - .context(ParquetFileCombiningSnafu); + .context(CombiningSnafu); - let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]); + let attributes = Attributes::from([ + ("sequencer_id", format!("{}", sequencer_id).into()), + ("partition_type", "hot".into()), + ]); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor.compaction_duration.recorder(attributes); + duration.record(delta); + } + + compact_result +} + +/// One compaction operation of one cold partition +pub(crate) async fn compact_cold_partition( + compactor: &Compactor, + partition: PartitionCompactionCandidateWithInfo, +) -> Result<(), Error> { + let start_time = compactor.time_provider.now(); + let sequencer_id = partition.sequencer_id(); + + let parquet_files_for_compaction = + parquet_file_lookup::ParquetFilesForCompaction::for_partition( + Arc::clone(&compactor.catalog), + partition.id(), + ) + .await + .context(LookupSnafu)?; + + let to_compact = parquet_file_filtering::filter_cold_parquet_files( + parquet_files_for_compaction, + compactor.config.cold_input_size_threshold_bytes(), + &compactor.parquet_file_candidate_gauge, + &compactor.parquet_file_candidate_bytes_gauge, + ); + + let compact_result = + if to_compact.len() == 1 && to_compact[0].compaction_level == CompactionLevel::Initial { + // upgrade the one l0 file to l1, don't run compaction + let mut repos = compactor.catalog.repositories().await; + + repos + .parquet_files() + .update_to_level_1(&[to_compact[0].id]) + .await + .context(UpgradingSnafu)?; + Ok(()) + } else { + parquet_file_combining::compact_parquet_files( + to_compact, + partition, + Arc::clone(&compactor.catalog), + compactor.store.clone(), + Arc::clone(&compactor.exec), + Arc::clone(&compactor.time_provider), + &compactor.compaction_counter, + compactor.config.max_desired_file_size_bytes(), + compactor.config.percentage_max_file_size(), + compactor.config.split_percentage(), + ) + .await + .context(CombiningSnafu) + }; + + let attributes = Attributes::from([ + ("sequencer_id", format!("{}", sequencer_id).into()), + ("partition_type", "cold".into()), + ]); if let Some(delta) = compactor .time_provider .now() @@ -103,12 +179,13 @@ mod tests { use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; use iox_time::{SystemProvider, TimeProvider}; use parquet_file::{storage::ParquetStorage, ParquetFilePath}; + use std::time::Duration; // A quite sophisticated integration test // Beside lp data, every value min/max sequence numbers and min/max time are important // to have a combination of needed tests in this test function #[tokio::test] - async fn test_compact_partition_many_files() { + async fn test_compact_hot_partition_many_files() { test_helpers::maybe_start_logging(); let catalog = TestCatalog::new(); @@ -253,7 +330,7 @@ mod tests { // ------------------------------------------------ // Compact let candidates = compactor - .partitions_to_compact( + .hot_partitions_to_compact( compactor.config.max_number_partitions_per_sequencer(), compactor .config @@ -266,7 +343,7 @@ mod tests { assert_eq!(candidates.len(), 1); let c = candidates.pop().unwrap(); - compact_partition(&compactor, c).await.unwrap(); + compact_hot_partition(&compactor, c).await.unwrap(); // Should have 3 non-soft-deleted files: // @@ -328,6 +405,362 @@ mod tests { ); } + #[tokio::test] + async fn test_compact_cold_partition_many_files() { + test_helpers::maybe_start_logging(); + let catalog = TestCatalog::new(); + + // lp1 does not overlap with any other level 0 + let lp1 = vec![ + "table,tag1=WA field_int=1000i 10", + "table,tag1=VT field_int=10i 20", + ] + .join("\n"); + + // lp2 overlaps with lp3 + let lp2 = vec![ + "table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate + "table,tag1=VT field_int=10i 10000", + "table,tag1=UT field_int=70i 20000", + ] + .join("\n"); + + // lp3 overlaps with lp2 + let lp3 = vec![ + "table,tag1=WA field_int=1500i 8000", // latest duplicate and kept + "table,tag1=VT field_int=10i 6000", + "table,tag1=UT field_int=270i 25000", + ] + .join("\n"); + + // lp4 does not overlap with any + let lp4 = vec![ + "table,tag2=WA,tag3=10 field_int=1600i 28000", + "table,tag2=VT,tag3=20 field_int=20i 26000", + ] + .join("\n"); + + // lp5 overlaps with lp1 + let lp5 = vec![ + "table,tag2=PA,tag3=15 field_int=1601i 9", + "table,tag2=OH,tag3=21 field_int=21i 25", + ] + .join("\n"); + + // lp6 does not overlap with any + let lp6 = vec![ + "table,tag2=PA,tag3=15 field_int=81601i 90000", + "table,tag2=OH,tag3=21 field_int=421i 91000", + ] + .join("\n"); + + let ns = catalog.create_namespace("ns").await; + let sequencer = ns.create_sequencer(1).await; + let table = ns.create_table("table").await; + table.create_column("field_int", ColumnType::I64).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag3", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; + let partition = table + .with_sequencer(&sequencer) + .create_partition("part") + .await; + let time = Arc::new(SystemProvider::new()); + let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); + let config = make_compactor_config(); + let metrics = Arc::new(metric::Registry::new()); + let compactor = Compactor::new( + vec![sequencer.sequencer.id], + Arc::clone(&catalog.catalog), + ParquetStorage::new(Arc::clone(&catalog.object_store)), + Arc::new(Executor::new(1)), + Arc::new(SystemProvider::new()), + BackoffConfig::default(), + config, + Arc::clone(&metrics), + ); + + // parquet files that are all in the same partition + + // pf1 does not overlap with any other level 0 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp1) + .with_max_seq(3) + .with_min_time(10) + .with_max_time(20) + .with_file_size_bytes(compactor.config.max_desired_file_size_bytes() + 10) + .with_creation_time(time_38_hour_ago); + partition.create_parquet_file(builder).await; + + // pf2 overlaps with pf3 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp2) + .with_max_seq(5) + .with_min_time(8_000) + .with_max_time(20_000) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago); + partition.create_parquet_file(builder).await; + + // pf3 overlaps with pf2 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp3) + .with_max_seq(10) + .with_min_time(6_000) + .with_max_time(25_000) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago); + partition.create_parquet_file(builder).await; + + // pf4 does not overlap with any but is small + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp4) + .with_max_seq(18) + .with_min_time(26_000) + .with_max_time(28_000) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago); + partition.create_parquet_file(builder).await; + + // pf5 was created in a previous compaction cycle; overlaps with pf1 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp5) + .with_max_seq(1) + .with_min_time(9) + .with_max_time(25) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); + partition.create_parquet_file(builder).await; + + // pf6 was created in a previous compaction cycle; does not overlap with any + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp6) + .with_max_seq(20) + .with_min_time(90000) + .with_max_time(91000) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); + partition.create_parquet_file(builder).await; + + // should have 4 level-0 files before compacting + let count = catalog.count_level_0_files(sequencer.sequencer.id).await; + assert_eq!(count, 4); + + // ------------------------------------------------ + // Compact + let candidates = compactor + .cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer()) + .await + .unwrap(); + let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); + + assert_eq!(candidates.len(), 1); + let c = candidates.pop().unwrap(); + + compact_cold_partition(&compactor, c).await.unwrap(); + + // Should have 3 non-soft-deleted files: + // + // - the level 1 file that didn't overlap with anything + // - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5 + let mut files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 3); + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + assert_eq!( + files_and_levels, + vec![ + (6, CompactionLevel::FileNonOverlapped), + (7, CompactionLevel::FileNonOverlapped), + (8, CompactionLevel::FileNonOverlapped), + ] + ); + + // ------------------------------------------------ + // Verify the parquet file content + + // Later compacted file + let file1 = files.pop().unwrap(); + let batches = read_parquet_file(&table, file1).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+------+------+-----------------------------+", + "| field_int | tag1 | tag2 | tag3 | time |", + "+-----------+------+------+------+-----------------------------+", + "| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |", + "| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |", + "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", + "+-----------+------+------+------+-----------------------------+", + ], + &batches + ); + + // Earlier compacted file + let file0 = files.pop().unwrap(); + let batches = read_parquet_file(&table, file0).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | tag3 | time |", + "+-----------+------+------+------+--------------------------------+", + "| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |", + "| 10 | VT | | | 1970-01-01T00:00:00.000006Z |", + "| 10 | VT | | | 1970-01-01T00:00:00.000010Z |", + "| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |", + "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |", + "| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |", + "| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |", + "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", + "+-----------+------+------+------+--------------------------------+", + ], + &batches + ); + } + + #[tokio::test] + async fn test_compact_cold_partition_one_level_0_without_overlap() { + test_helpers::maybe_start_logging(); + let catalog = TestCatalog::new(); + + // lp1 does not overlap with any other level 0 or level 1 + let lp1 = vec![ + "table,tag1=WA field_int=1000i 10", + "table,tag1=VT field_int=10i 20", + ] + .join("\n"); + + // lp6 does not overlap with any + let lp6 = vec![ + "table,tag2=PA,tag3=15 field_int=81601i 90000", + "table,tag2=OH,tag3=21 field_int=421i 91000", + ] + .join("\n"); + + let ns = catalog.create_namespace("ns").await; + let sequencer = ns.create_sequencer(1).await; + let table = ns.create_table("table").await; + table.create_column("field_int", ColumnType::I64).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag3", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; + let partition = table + .with_sequencer(&sequencer) + .create_partition("part") + .await; + let time = Arc::new(SystemProvider::new()); + let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(); + let config = make_compactor_config(); + let metrics = Arc::new(metric::Registry::new()); + let compactor = Compactor::new( + vec![sequencer.sequencer.id], + Arc::clone(&catalog.catalog), + ParquetStorage::new(Arc::clone(&catalog.object_store)), + Arc::new(Executor::new(1)), + Arc::new(SystemProvider::new()), + BackoffConfig::default(), + config, + Arc::clone(&metrics), + ); + + // parquet files that are all in the same partition + + // pf1 does not overlap with any other level 0 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp1) + .with_max_seq(3) + .with_min_time(10) + .with_max_time(20) + .with_file_size_bytes(compactor.config.max_desired_file_size_bytes() + 10) + .with_creation_time(time_38_hour_ago); + partition.create_parquet_file(builder).await; + + // pf6 was created in a previous compaction cycle; does not overlap with any + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp6) + .with_max_seq(20) + .with_min_time(90000) + .with_max_time(91000) + .with_file_size_bytes(100) // small file + .with_creation_time(time_38_hour_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); + partition.create_parquet_file(builder).await; + + // should have 1 level-0 file before compacting + let count = catalog.count_level_0_files(sequencer.sequencer.id).await; + assert_eq!(count, 1); + + // ------------------------------------------------ + // Compact + let candidates = compactor + .cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer()) + .await + .unwrap(); + let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap(); + + assert_eq!(candidates.len(), 1); + let c = candidates.pop().unwrap(); + + compact_cold_partition(&compactor, c).await.unwrap(); + + // Should have 2 non-soft-deleted files: + // + // - the level 1 file that didn't overlap with anything + // - the newly created level 1 file that was only upgraded from level 0 + // - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5 + let mut files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 2); + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + assert_eq!( + files_and_levels, + vec![ + (1, CompactionLevel::FileNonOverlapped), + (2, CompactionLevel::FileNonOverlapped), + ] + ); + + // ------------------------------------------------ + // Verify the parquet file content + + // Later compacted file + let file1 = files.pop().unwrap(); + let batches = read_parquet_file(&table, file1).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+------+-----------------------------+", + "| field_int | tag2 | tag3 | time |", + "+-----------+------+------+-----------------------------+", + "| 421 | OH | 21 | 1970-01-01T00:00:00.000091Z |", + "| 81601 | PA | 15 | 1970-01-01T00:00:00.000090Z |", + "+-----------+------+------+-----------------------------+", + ], + &batches + ); + + // Earlier compacted file + let file0 = files.pop().unwrap(); + let batches = read_parquet_file(&table, file0).await; + assert_batches_sorted_eq!( + &[ + "+-----------+------+--------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+--------------------------------+", + "| 10 | VT | 1970-01-01T00:00:00.000000020Z |", + "| 1000 | WA | 1970-01-01T00:00:00.000000010Z |", + "+-----------+------+--------------------------------+", + ], + &batches + ); + } + async fn read_parquet_file(table: &Arc, file: ParquetFile) -> Vec { let storage = ParquetStorage::new(table.catalog.object_store()); @@ -354,19 +787,26 @@ mod tests { let percentage_max_file_size = 30; let split_percentage = 80; let max_concurrent_size_bytes = 100_000; + let max_cold_concurrent_size_bytes = 90_000; let max_number_partitions_per_sequencer = 1; let min_number_recent_ingested_per_partition = 1; let input_size_threshold_bytes = 300 * 1024 * 1024; + let cold_input_size_threshold_bytes = 600 * 1024 * 1024; let input_file_count_threshold = 100; + let hot_multiple = 4; + CompactorConfig::new( max_desired_file_size_bytes, percentage_max_file_size, split_percentage, max_concurrent_size_bytes, + max_cold_concurrent_size_bytes, max_number_partitions_per_sequencer, min_number_recent_ingested_per_partition, input_size_threshold_bytes, + cold_input_size_threshold_bytes, input_file_count_threshold, + hot_multiple, ) } } diff --git a/compactor/src/parquet_file_filtering.rs b/compactor/src/parquet_file_filtering.rs index d2fddcab36..650c160712 100644 --- a/compactor/src/parquet_file_filtering.rs +++ b/compactor/src/parquet_file_filtering.rs @@ -6,8 +6,8 @@ use data_types::ParquetFile; use metric::{Attributes, Metric, U64Gauge}; use observability_deps::tracing::*; -/// Given a list of level 0 files sorted by max sequence number and a list of level 1 files for a -/// partition, select a subset set of files that: +/// Given a list of hot level 0 files sorted by max sequence number and a list of level 1 files for +/// a partition, select a subset set of files that: /// /// - Has a subset of the level 0 files selected, from the start of the sorted level 0 list /// - Has a total size less than `max_bytes` @@ -16,7 +16,7 @@ use observability_deps::tracing::*; /// /// The returned files will be ordered with the level 1 files first, then the level 0 files ordered /// in ascending order by their max sequence number. -pub(crate) fn filter_parquet_files( +pub(crate) fn filter_hot_parquet_files( // Level 0 files sorted by max sequence number and level 1 files in arbitrary order for one // partition parquet_files_for_compaction: ParquetFilesForCompaction, @@ -37,7 +37,7 @@ pub(crate) fn filter_parquet_files( } = parquet_files_for_compaction; if level_0.is_empty() { - info!("No level 0 files to consider for compaction"); + info!("No hot level 0 files to consider for compaction"); return Vec::new(); } @@ -79,7 +79,7 @@ pub(crate) fn filter_parquet_files( // Increase the running total by the size of all the overlapping level 1 files total_level_1_bytes += overlaps.iter().map(|f| f.file_size_bytes).sum::() as u64; - // Move the overlapping level 1 files to`files_to_return` so they're not considered again; + // Move the overlapping level 1 files to `files_to_return` so they're not considered again; // a level 1 file overlapping with one level 0 file is enough for its inclusion. This way, // we also don't include level 1 files multiple times. files_to_return.extend(overlaps); @@ -107,7 +107,120 @@ pub(crate) fn filter_parquet_files( num_level_1_considering, num_level_0_compacting, num_level_1_compacting, - "filtered Parquet files for compaction", + "filtered hot Parquet files for compaction", + ); + + record_file_metrics( + parquet_file_candidate_gauge, + num_level_0_considering as u64, + num_level_1_considering as u64, + num_level_0_compacting as u64, + num_level_1_compacting as u64, + ); + record_byte_metrics( + parquet_file_candidate_bytes_gauge, + total_level_0_bytes as u64, + total_level_1_bytes as u64, + ); + + // Return the level 1 files first, followed by the level 0 files assuming we've maintained + // their ordering by max sequence number. + files_to_return.extend(level_0_to_return); + files_to_return +} + +/// Given a list of cold level 0 files sorted by max sequence number and a list of level 1 files for +/// a partition, select a subset set of files that: +/// +/// - Has all of the level 0 files selected +/// - Has only level 1 files that overlap in time with the level 0 files +/// +/// The returned files will be ordered with the level 1 files first, then the level 0 files ordered +/// in ascending order by their max sequence number. +/// +/// If only one level 0 file is returned, it can be upgraded to level 1 without running compaction. +pub(crate) fn filter_cold_parquet_files( + // Level 0 files sorted by max sequence number and level 1 files in arbitrary order for one + // partition + parquet_files_for_compaction: ParquetFilesForCompaction, + // Stop considering level 0 files when the total size of all files selected for compaction so + // far exceeds this value + max_bytes: u64, + // Gauge for the number of Parquet file candidates + parquet_file_candidate_gauge: &Metric, + // Gauge for the number of bytes of Parquet file candidates + parquet_file_candidate_bytes_gauge: &Metric, +) -> Vec { + let ParquetFilesForCompaction { + level_0, + level_1: mut remaining_level_1, + } = parquet_files_for_compaction; + + if level_0.is_empty() { + info!("No cold level 0 files to consider for compaction"); + return Vec::new(); + } + + // Guaranteed to exist because of the empty check and early return above. Also assuming all + // files are for the same partition. + let partition_id = level_0[0].partition_id; + + let num_level_0_considering = level_0.len(); + let num_level_1_considering = remaining_level_1.len(); + + // This will start by holding the level 1 files that are found to overlap an included level 0 + // file. At the end of this function, the level 0 files are added to the end so they are sorted + // last. + let mut files_to_return = Vec::with_capacity(level_0.len() + remaining_level_1.len()); + // Keep track of level 0 files to include in this compaction operation; maintain assumed + // ordering by max sequence number. + let mut level_0_to_return = Vec::with_capacity(level_0.len()); + // Running total of the size, in bytes, of level 0 files and level 1 files for inclusion. + // The levels are counted up separately for metrics. + let mut total_level_0_bytes = 0; + let mut total_level_1_bytes = 0; + + for level_0_file in level_0 { + // Include at least one level 0 file without checking against `max_bytes` + total_level_0_bytes += level_0_file.file_size_bytes as u64; + + // Find all level 1 files that overlap with this level 0 file. + let (overlaps, non_overlaps): (Vec<_>, Vec<_>) = remaining_level_1 + .into_iter() + .partition(|level_1_file| overlaps_in_time(level_1_file, &level_0_file)); + + // Increase the running total by the size of all the overlapping level 1 files + total_level_1_bytes += overlaps.iter().map(|f| f.file_size_bytes).sum::() as u64; + + // Move the overlapping level 1 files to `files_to_return` so they're not considered again; + // a level 1 file overlapping with one level 0 file is enough for its inclusion. This way, + // we also don't include level 1 files multiple times. + files_to_return.extend(overlaps); + + // The remaining level 1 files to possibly include in future iterations are the remaining + // ones that did not overlap with this level 0 file. + remaining_level_1 = non_overlaps; + + // Move the level 0 file into the list of level 0 files to return + level_0_to_return.push(level_0_file); + + // Stop considering level 0 files if the total size of all files is over or equal to + // `max_bytes` + if (total_level_0_bytes + total_level_1_bytes) >= max_bytes { + break; + } + } + + let num_level_0_compacting = level_0_to_return.len(); + let num_level_1_compacting = files_to_return.len(); + + info!( + partition_id = partition_id.get(), + num_level_0_considering, + num_level_1_considering, + num_level_0_compacting, + num_level_1_compacting, + "filtered cold Parquet files for compaction", ); record_file_metrics( @@ -246,9 +359,6 @@ mod tests { ); } - const DEFAULT_MAX_FILE_SIZE: u64 = 1024 * 1024 * 10; - const DEFAULT_INPUT_FILE_COUNT: usize = 100; - fn metrics() -> (Metric, Metric) { let registry = Arc::new(metric::Registry::new()); @@ -268,392 +378,794 @@ mod tests { ) } - #[test] - fn empty_in_empty_out() { - let parquet_files_for_compaction = ParquetFilesForCompaction { - level_0: vec![], - level_1: vec![], - }; - let (files_metric, bytes_metric) = metrics(); + mod hot { + use super::*; - let files = filter_parquet_files( - parquet_files_for_compaction, - DEFAULT_MAX_FILE_SIZE, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); + const DEFAULT_MAX_FILE_SIZE: u64 = 1024 * 1024 * 10; + const DEFAULT_INPUT_FILE_COUNT: usize = 100; - assert!(files.is_empty(), "Expected empty, got: {:#?}", files); - } + #[test] + fn empty_in_empty_out() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); - #[test] - fn max_size_0_returns_one_level_0_file() { - let parquet_files_for_compaction = ParquetFilesForCompaction { - level_0: vec![ParquetFileBuilder::level_0().id(1).build()], - level_1: vec![], - }; - let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); - let files = filter_parquet_files( - parquet_files_for_compaction, - 0, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); + assert!(files.is_empty(), "Expected empty, got: {:#?}", files); + } - assert_eq!(files.len(), 1); - assert_eq!(files[0].id.get(), 1); - } + #[test] + fn max_size_0_returns_one_level_0_file() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0().id(1).build()], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); - #[test] - fn max_file_count_0_returns_empty() { - let parquet_files_for_compaction = ParquetFilesForCompaction { - level_0: vec![ParquetFileBuilder::level_0().id(1).build()], - level_1: vec![], - }; - let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction, + 0, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); - let files = filter_parquet_files( - parquet_files_for_compaction, - DEFAULT_MAX_FILE_SIZE, - 0, - &files_metric, - &bytes_metric, - ); + assert_eq!(files.len(), 1); + assert_eq!(files[0].id.get(), 1); + } - assert!(files.is_empty(), "Expected empty, got: {:#?}", files); - } + #[test] + fn max_file_count_0_returns_empty() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0().id(1).build()], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); - #[test] - fn max_size_0_returns_one_level_0_file_and_its_level_1_overlaps() { - let parquet_files_for_compaction = ParquetFilesForCompaction { - level_0: vec![ParquetFileBuilder::level_0() - .id(1) - .min_time(200) - .max_time(300) - .build()], - level_1: vec![ - // Too early - ParquetFileBuilder::level_1() - .id(101) - .min_time(1) - .max_time(50) - .build(), - // Completely contains the level 0 times - ParquetFileBuilder::level_1() - .id(102) - .min_time(150) - .max_time(350) - .build(), - // Too late - ParquetFileBuilder::level_1() - .id(103) - .min_time(400) - .max_time(500) - .build(), - ], - }; - let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + 0, + &files_metric, + &bytes_metric, + ); - let files = filter_parquet_files( - parquet_files_for_compaction, - 0, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); + assert!(files.is_empty(), "Expected empty, got: {:#?}", files); + } - assert_eq!(files.len(), 2); - assert_eq!(files[0].id.get(), 102); - assert_eq!(files[1].id.get(), 1); - } - - #[test] - fn returns_only_overlapping_level_1_files_in_order() { - let parquet_files_for_compaction = ParquetFilesForCompaction { - level_0: vec![ - // Level 0 files that overlap in time slightly. - ParquetFileBuilder::level_0() + #[test] + fn max_size_0_returns_one_level_0_file_and_its_level_1_overlaps() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0() .id(1) .min_time(200) .max_time(300) - .file_size_bytes(10) - .build(), - ParquetFileBuilder::level_0() - .id(2) - .min_time(280) - .max_time(310) - .file_size_bytes(10) - .build(), - ParquetFileBuilder::level_0() - .id(3) - .min_time(309) - .max_time(350) - .file_size_bytes(10) - .build(), - ], - // Level 1 files can be assumed not to overlap each other. - level_1: vec![ - // Does not overlap any level 0, times are too early - ParquetFileBuilder::level_1() - .id(101) - .min_time(1) - .max_time(50) - .file_size_bytes(10) - .build(), - // Overlaps file 1 - ParquetFileBuilder::level_1() - .id(102) - .min_time(199) - .max_time(201) - .file_size_bytes(10) - .build(), - // Overlaps files 1 and 2 - ParquetFileBuilder::level_1() - .id(103) - .min_time(290) + .build()], + level_1: vec![ + // Too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .build(), + // Completely contains the level 0 times + ParquetFileBuilder::level_1() + .id(102) + .min_time(150) + .max_time(350) + .build(), + // Too late + ParquetFileBuilder::level_1() + .id(103) + .min_time(400) + .max_time(500) + .build(), + ], + }; + let (files_metric, bytes_metric) = metrics(); + + let files = filter_hot_parquet_files( + parquet_files_for_compaction, + 0, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); + + assert_eq!(files.len(), 2); + assert_eq!(files[0].id.get(), 102); + assert_eq!(files[1].id.get(), 1); + } + + #[test] + fn returns_only_overlapping_level_1_files_in_order() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ + // Level 0 files that overlap in time slightly. + ParquetFileBuilder::level_0() + .id(1) + .min_time(200) + .max_time(300) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(2) + .min_time(280) + .max_time(310) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(3) + .min_time(309) + .max_time(350) + .file_size_bytes(10) + .build(), + ], + // Level 1 files can be assumed not to overlap each other. + level_1: vec![ + // Does not overlap any level 0, times are too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .file_size_bytes(10) + .build(), + // Overlaps file 1 + ParquetFileBuilder::level_1() + .id(102) + .min_time(199) + .max_time(201) + .file_size_bytes(10) + .build(), + // Overlaps files 1 and 2 + ParquetFileBuilder::level_1() + .id(103) + .min_time(290) + .max_time(300) + .file_size_bytes(10) + .build(), + // Overlaps file 2 + ParquetFileBuilder::level_1() + .id(104) + .min_time(305) + .max_time(305) + .file_size_bytes(10) + .build(), + // Overlaps files 2 and 3 + ParquetFileBuilder::level_1() + .id(105) + .min_time(308) + .max_time(311) + .file_size_bytes(10) + .build(), + // Overlaps file 3 + ParquetFileBuilder::level_1() + .id(106) + .min_time(340) + .max_time(360) + .file_size_bytes(10) + .build(), + // Does not overlap any level 0, times are too late + ParquetFileBuilder::level_1() + .id(107) + .min_time(390) + .max_time(399) + .file_size_bytes(10) + .build(), + ], + }; + + // Max size 0; only the first level 0 file and its overlapping level 1 files get + // returned + let max_size = 0; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 1]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 1, + level_0_not_selected: 2, + level_1_selected: 2, + level_1_not_selected: 5, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 10, + level_1: 20, + } + ); + + // Increase max size; 1st two level 0 files & their overlapping level 1 files get + // returned + let max_size = 40; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 1, 2]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 2, + level_0_not_selected: 1, + level_1_selected: 4, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 20, + level_1: 40, + } + ); + + // Increase max size to be exactly equal to the size of the 1st two level 0 files & + // their overlapping level 1 files, which is all that should get returned + let max_size = 60; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 1, 2]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 2, + level_0_not_selected: 1, + level_1_selected: 4, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 20, + level_1: 40, + } + ); + + // Increase max size; all level 0 files & their overlapping level 1 files get returned + let max_size = 70; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + DEFAULT_INPUT_FILE_COUNT, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 106, 1, 2, 3]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 3, + level_0_not_selected: 0, + level_1_selected: 5, + level_1_not_selected: 2, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 30, + level_1: 50, + } + ); + + // Set input_file_count_threshold to 1; the first level 0 file and its overlapping + // level 1 files get returned + let input_file_count_threshold = 1; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + DEFAULT_MAX_FILE_SIZE, + input_file_count_threshold, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 1]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 1, + level_0_not_selected: 2, + level_1_selected: 2, + level_1_not_selected: 5, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 10, + level_1: 20, + } + ); + + // Set input_file_count_threshold to 3; still only the first level 0 file and its + // overlapping level 1 files get returned + let input_file_count_threshold = 3; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction.clone(), + DEFAULT_MAX_FILE_SIZE, + input_file_count_threshold, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 1]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 1, + level_0_not_selected: 2, + level_1_selected: 2, + level_1_not_selected: 5, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 10, + level_1: 20, + } + ); + + // Set input_file_count_threshold to 4; the first two level 0 files and their + // overlapping level 1 files get returned + let input_file_count_threshold = 4; + let (files_metric, bytes_metric) = metrics(); + let files = filter_hot_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + input_file_count_threshold, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 1, 2]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 2, + level_0_not_selected: 1, + level_1_selected: 4, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 20, + level_1: 40, + } + ); + } + } + + mod cold { + use super::*; + + const DEFAULT_MAX_FILE_SIZE: u64 = 1024 * 1024 * 10; + + #[test] + fn empty_in_empty_out() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); + + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + &files_metric, + &bytes_metric, + ); + + assert!(files.is_empty(), "Expected empty, got: {:#?}", files); + } + + #[test] + fn max_size_0_returns_one_level_0_file() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0().id(1).build()], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); + + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + 0, + &files_metric, + &bytes_metric, + ); + + assert_eq!(files.len(), 1); + assert_eq!(files[0].id.get(), 1); + } + + #[test] + fn one_level_0_file_no_level_1_overlaps() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0() + .id(1) + .min_time(200) .max_time(300) - .file_size_bytes(10) - .build(), - // Overlaps file 2 - ParquetFileBuilder::level_1() - .id(104) - .min_time(305) - .max_time(305) - .file_size_bytes(10) - .build(), - // Overlaps files 2 and 3 - ParquetFileBuilder::level_1() - .id(105) - .min_time(308) - .max_time(311) - .file_size_bytes(10) - .build(), - // Overlaps file 3 - ParquetFileBuilder::level_1() - .id(106) - .min_time(340) - .max_time(360) - .file_size_bytes(10) - .build(), - // Does not overlap any level 0, times are too late - ParquetFileBuilder::level_1() - .id(107) - .min_time(390) - .max_time(399) - .file_size_bytes(10) - .build(), - ], - }; + .build()], + level_1: vec![ + // Too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .build(), + // Too late + ParquetFileBuilder::level_1() + .id(103) + .min_time(400) + .max_time(500) + .build(), + ], + }; + let (files_metric, bytes_metric) = metrics(); - // Max size 0; only the first level 0 file and its overlapping level 1 files get returned - let max_size = 0; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - max_size, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 1]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 1, - level_0_not_selected: 2, - level_1_selected: 2, - level_1_not_selected: 5, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 10, - level_1: 20, - } - ); + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + &files_metric, + &bytes_metric, + ); - // Increase max size; 1st two level 0 files & their overlapping level 1 files get returned - let max_size = 40; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - max_size, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 104, 105, 1, 2]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 2, - level_0_not_selected: 1, - level_1_selected: 4, - level_1_not_selected: 3, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 20, - level_1: 40, - } - ); + assert_eq!(files.len(), 1); + assert_eq!(files[0].id.get(), 1); + } - // Increase max size to be exactly equal to the size of the 1st two level 0 files & their - // overlapping level 1 files, which is all that should get returned - let max_size = 60; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - max_size, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 104, 105, 1, 2]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 2, - level_0_not_selected: 1, - level_1_selected: 4, - level_1_not_selected: 3, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 20, - level_1: 40, - } - ); + #[test] + fn one_level_0_file_with_level_1_overlaps() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0() + .id(1) + .min_time(200) + .max_time(300) + .build()], + level_1: vec![ + // Too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .build(), + // Completely contains the level 0 times + ParquetFileBuilder::level_1() + .id(102) + .min_time(150) + .max_time(350) + .build(), + // Too late + ParquetFileBuilder::level_1() + .id(103) + .min_time(400) + .max_time(500) + .build(), + ], + }; + let (files_metric, bytes_metric) = metrics(); - // Increase max size; all level 0 files & their overlapping level 1 files get returned - let max_size = 70; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - max_size, - DEFAULT_INPUT_FILE_COUNT, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 104, 105, 106, 1, 2, 3]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 3, - level_0_not_selected: 0, - level_1_selected: 5, - level_1_not_selected: 2, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 30, - level_1: 50, - } - ); + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + &files_metric, + &bytes_metric, + ); - // Set input_file_count_threshold to 1; the first level 0 file and its overlapping level 1 - // files get returned - let input_file_count_threshold = 1; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - DEFAULT_MAX_FILE_SIZE, - input_file_count_threshold, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 1]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 1, - level_0_not_selected: 2, - level_1_selected: 2, - level_1_not_selected: 5, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 10, - level_1: 20, - } - ); + assert_eq!(files.len(), 2); + assert_eq!(files[0].id.get(), 102); + assert_eq!(files[1].id.get(), 1); + } - // Set input_file_count_threshold to 3; still only the first level 0 file and its - // overlapping level 1 files get returned - let input_file_count_threshold = 3; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction.clone(), - DEFAULT_MAX_FILE_SIZE, - input_file_count_threshold, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 1]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 1, - level_0_not_selected: 2, - level_1_selected: 2, - level_1_not_selected: 5, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 10, - level_1: 20, - } - ); + #[test] + fn multiple_level_0_files_no_level_1_overlaps() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ + // Level 0 files, some of which overlap in time slightly. + ParquetFileBuilder::level_0() + .id(1) + .min_time(200) + .max_time(300) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(2) + .min_time(280) + .max_time(310) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(3) + .min_time(320) + .max_time(350) + .file_size_bytes(10) + .build(), + ], + // Level 1 files can be assumed not to overlap each other. + level_1: vec![ + // too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .file_size_bytes(10) + .build(), + // between 2 and 3 (there can't be one between 1 and 2 because they overlap) + ParquetFileBuilder::level_1() + .id(103) + .min_time(315) + .max_time(316) + .file_size_bytes(10) + .build(), + // too late + ParquetFileBuilder::level_1() + .id(107) + .min_time(390) + .max_time(399) + .file_size_bytes(10) + .build(), + ], + }; - // Set input_file_count_threshold to 4; the first two level 0 files and their overlapping - // level 1 files get returned - let input_file_count_threshold = 4; - let (files_metric, bytes_metric) = metrics(); - let files = filter_parquet_files( - parquet_files_for_compaction, - DEFAULT_MAX_FILE_SIZE, - input_file_count_threshold, - &files_metric, - &bytes_metric, - ); - let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); - assert_eq!(ids, [102, 103, 104, 105, 1, 2]); - assert_eq!( - extract_file_metrics(&files_metric), - ExtractedFileMetrics { - level_0_selected: 2, - level_0_not_selected: 1, - level_1_selected: 4, - level_1_not_selected: 3, - } - ); - assert_eq!( - extract_byte_metrics(&bytes_metric), - ExtractedByteMetrics { - level_0: 20, - level_1: 40, - } - ); + // all level 0 files & no level 1 files get returned + let (files_metric, bytes_metric) = metrics(); + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [1, 2, 3]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 3, + level_0_not_selected: 0, + level_1_selected: 0, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 30, + level_1: 0, + } + ); + } + + #[test] + fn multiple_level_0_files_with_level_1_overlaps() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ + // Level 0 files that overlap in time slightly. + ParquetFileBuilder::level_0() + .id(1) + .min_time(200) + .max_time(300) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(2) + .min_time(280) + .max_time(310) + .file_size_bytes(10) + .build(), + ParquetFileBuilder::level_0() + .id(3) + .min_time(309) + .max_time(350) + .file_size_bytes(10) + .build(), + ], + // Level 1 files can be assumed not to overlap each other. + level_1: vec![ + // Does not overlap any level 0, times are too early + ParquetFileBuilder::level_1() + .id(101) + .min_time(1) + .max_time(50) + .file_size_bytes(10) + .build(), + // Overlaps file 1 + ParquetFileBuilder::level_1() + .id(102) + .min_time(199) + .max_time(201) + .file_size_bytes(10) + .build(), + // Overlaps files 1 and 2 + ParquetFileBuilder::level_1() + .id(103) + .min_time(290) + .max_time(300) + .file_size_bytes(10) + .build(), + // Overlaps file 2 + ParquetFileBuilder::level_1() + .id(104) + .min_time(305) + .max_time(305) + .file_size_bytes(10) + .build(), + // Overlaps files 2 and 3 + ParquetFileBuilder::level_1() + .id(105) + .min_time(308) + .max_time(311) + .file_size_bytes(10) + .build(), + // Overlaps file 3 + ParquetFileBuilder::level_1() + .id(106) + .min_time(340) + .max_time(360) + .file_size_bytes(10) + .build(), + // Does not overlap any level 0, times are too late + ParquetFileBuilder::level_1() + .id(107) + .min_time(390) + .max_time(399) + .file_size_bytes(10) + .build(), + ], + }; + + // Max size 0; only the first level 0 file and its overlapping level 1 files get + // returned + let max_size = 0; + let (files_metric, bytes_metric) = metrics(); + let files = filter_cold_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 1]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 1, + level_0_not_selected: 2, + level_1_selected: 2, + level_1_not_selected: 5, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 10, + level_1: 20, + } + ); + + // Increase max size; 1st two level 0 files & their overlapping level 1 files get + // returned + let max_size = 40; + let (files_metric, bytes_metric) = metrics(); + let files = filter_cold_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 1, 2]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 2, + level_0_not_selected: 1, + level_1_selected: 4, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 20, + level_1: 40, + } + ); + + // Increase max size to be exactly equal to the size of the 1st two level 0 files & + // their overlapping level 1 files, which is all that should get returned + let max_size = 60; + let (files_metric, bytes_metric) = metrics(); + let files = filter_cold_parquet_files( + parquet_files_for_compaction.clone(), + max_size, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 1, 2]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 2, + level_0_not_selected: 1, + level_1_selected: 4, + level_1_not_selected: 3, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 20, + level_1: 40, + } + ); + + // Increase max size; all level 0 files & their overlapping level 1 files get returned + let (files_metric, bytes_metric) = metrics(); + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + &files_metric, + &bytes_metric, + ); + let ids: Vec<_> = files.iter().map(|f| f.id.get()).collect(); + assert_eq!(ids, [102, 103, 104, 105, 106, 1, 2, 3]); + assert_eq!( + extract_file_metrics(&files_metric), + ExtractedFileMetrics { + level_0_selected: 3, + level_0_not_selected: 0, + level_1_selected: 5, + level_1_not_selected: 2, + } + ); + assert_eq!( + extract_byte_metrics(&bytes_metric), + ExtractedByteMetrics { + level_0: 30, + level_1: 50, + } + ); + } } /// Create ParquetFile instances for testing. Only sets fields relevant to the filtering; other diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 1a1ff99b62..7e5c778694 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -420,10 +420,13 @@ impl Config { percentage_max_file_size: 30, split_percentage: 80, max_concurrent_size_bytes: 100_000, + max_cold_concurrent_size_bytes: 90_000, max_number_partitions_per_sequencer: 1, min_number_recent_ingested_files_per_partition: 1, input_size_threshold_bytes: 314_572_800, + cold_input_size_threshold_bytes: 629_145_600, input_file_count_threshold: 100, + hot_multiple: 4, }; let querier_config = QuerierConfig { diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 0d4162dd68..ac6e1403c4 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -552,10 +552,12 @@ pub trait ParquetFileRepo: Send + Sync { num_partitions: usize, ) -> Result>; - /// List partitions with the most level 0 files for a given sequencer + /// List partitions with the most level 0 files created earlier than `older_than_num_hours` + /// hours ago for a given sequencer. In other words, "cold" partitions that need compaction. async fn most_level_0_files_partitions( &mut self, sequencer_id: SequencerId, + older_than_num_hours: u32, num_partitions: usize, ) -> Result>; @@ -2686,17 +2688,25 @@ pub(crate) mod test_helpers { .await .unwrap(); + let time_five_hour_ago = Timestamp::new( + (catalog.time_provider().now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(), + ); + let time_38_hour_ago = Timestamp::new( + (catalog.time_provider().now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(), + ); + + let older_than = 24; let num_partitions = 2; // Db has no partition let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert!(partitions.is_empty()); - // The DB has 1 partition but it does not have any file + // The DB has 1 partition but it does not have any files let partition = repos .partitions() .create_or_get("one".into(), sequencer.id, table.id) @@ -2704,7 +2714,7 @@ pub(crate) mod test_helpers { .unwrap(); let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert!(partitions.is_empty()); @@ -2722,7 +2732,7 @@ pub(crate) mod test_helpers { file_size_bytes: 1337, row_count: 0, compaction_level: CompactionLevel::Initial, - created_at: Timestamp::new(1), + created_at: time_38_hour_ago, column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), }; let delete_l0_file = repos @@ -2737,12 +2747,42 @@ pub(crate) mod test_helpers { .unwrap(); let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert!(partitions.is_empty()); - // The partition has one non-deleted level 0 files + // A partition with one cold file and one hot file + let hot_partition = repos + .partitions() + .create_or_get("hot".into(), sequencer.id, table.id) + .await + .unwrap(); + let cold_file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: hot_partition.id, + ..parquet_file_params.clone() + }; + repos + .parquet_files() + .create(cold_file_params) + .await + .unwrap(); + let hot_file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: hot_partition.id, + created_at: time_five_hour_ago, + ..parquet_file_params.clone() + }; + repos.parquet_files().create(hot_file_params).await.unwrap(); + let partitions = repos + .parquet_files() + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) + .await + .unwrap(); + assert!(partitions.is_empty()); + + // The partition has one non-deleted level 0 file let l0_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), ..parquet_file_params.clone() @@ -2754,12 +2794,12 @@ pub(crate) mod test_helpers { .unwrap(); let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert_eq!(partitions.len(), 1); - // The DB has 2 partitions both has non-deleled L0 files + // The DB has 2 partitions; both have non-deleted L0 files let another_partition = repos .partitions() .create_or_get("two".into(), sequencer.id, table.id) @@ -2788,7 +2828,7 @@ pub(crate) mod test_helpers { // Must return 2 partitions let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert_eq!(partitions.len(), 2); @@ -2815,11 +2855,11 @@ pub(crate) mod test_helpers { // Still return 2 partitions the limit num_partitions=2 let partitions = repos .parquet_files() - .most_level_0_files_partitions(sequencer.id, num_partitions) + .most_level_0_files_partitions(sequencer.id, older_than, num_partitions) .await .unwrap(); assert_eq!(partitions.len(), 2); - // and the first one should stil be the one with the most L0 files + // and the first one should still be the one with the most L0 files assert_eq!(partitions[0].partition_id, another_partition.id); } diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index cb9231cad4..e889aecfae 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1169,10 +1169,16 @@ impl ParquetFileRepo for MemTxn { async fn most_level_0_files_partitions( &mut self, sequencer_id: SequencerId, + older_than_num_hours: u32, num_partitions: usize, ) -> Result> { + let time_nano = (self.time_provider.now() + - Duration::from_secs(60 * 60 * older_than_num_hours as u64)) + .timestamp_nanos(); + let older_than = Timestamp::new(time_nano); + let stage = self.stage(); - let partitions = stage + let relevant_parquet_files = stage .parquet_files .iter() .filter(|f| { @@ -1180,30 +1186,44 @@ impl ParquetFileRepo for MemTxn { && f.compaction_level == CompactionLevel::Initial && f.to_delete.is_none() }) - .map(|pf| PartitionParam { - partition_id: pf.partition_id, - sequencer_id: pf.sequencer_id, - namespace_id: pf.namespace_id, - table_id: pf.table_id, - }) .collect::>(); // Count num of files per partition by simply count the number of partition duplicates let mut partition_duplicate_count: HashMap = - HashMap::with_capacity(partitions.len()); - for p in partitions { - let count = partition_duplicate_count.entry(p).or_insert(0); + HashMap::with_capacity(relevant_parquet_files.len()); + let mut partition_max_created_at = HashMap::with_capacity(relevant_parquet_files.len()); + for pf in relevant_parquet_files { + let key = PartitionParam { + partition_id: pf.partition_id, + sequencer_id: pf.sequencer_id, + namespace_id: pf.namespace_id, + table_id: pf.table_id, + }; + let count = partition_duplicate_count.entry(key).or_insert(0); *count += 1; + let max_created_at = partition_max_created_at.entry(key).or_insert(pf.created_at); + if pf.created_at > *max_created_at { + *max_created_at = pf.created_at; + } } - // Sort partitions by file count - let mut partitions = partition_duplicate_count.iter().collect::>(); + // Sort partitions whose max created at is older than the limit by their file count + let mut partitions = partition_duplicate_count + .iter() + .filter(|(k, _v)| partition_max_created_at.get(k).unwrap() < &older_than) + .collect::>(); partitions.sort_by(|a, b| b.1.cmp(a.1)); // Return top partitions with most file counts let partitions = partitions .into_iter() .map(|(k, _)| *k) + .map(|pf| PartitionParam { + partition_id: pf.partition_id, + sequencer_id: pf.sequencer_id, + namespace_id: pf.namespace_id, + table_id: pf.table_id, + }) .take(num_partitions) .collect::>(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index d97a5c2eb6..46c5732ca9 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -280,7 +280,7 @@ decorate!( "parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp) -> Result; "parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result>; "recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, sequencer_id: SequencerId, num_hours: u32, min_num_files: usize, num_partitions: usize) -> Result>; - "most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, num_partitions: usize) -> Result>; + "most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, older_than_num_hours: u32, num_partitions: usize) -> Result>; ] ); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 5a78f1717d..405562ef03 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1695,14 +1695,14 @@ WHERE parquet_file.sequencer_id = $1 sqlx::query_as::<_, PartitionParam>( r#" SELECT partition_id, sequencer_id, namespace_id, table_id, count(id) -FROM parquet_file +FROM parquet_file WHERE compaction_level = 0 and to_delete is null and sequencer_id = $1 and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval group by 1, 2, 3, 4 having count(id) >= $3 order by 5 DESC -limit $4; +limit $4; "#, ) .bind(&sequencer_id) // $1 @@ -1717,25 +1717,30 @@ limit $4; async fn most_level_0_files_partitions( &mut self, sequencer_id: SequencerId, + older_than_num_hours: u32, num_partitions: usize, ) -> Result> { + let older_than_num_hours = older_than_num_hours as i32; let num_partitions = num_partitions as i32; // The preliminary performance test says this query runs around 50ms // We have index on (sequencer_id, comapction_level, to_delete) sqlx::query_as::<_, PartitionParam>( r#" -SELECT partition_id, sequencer_id, namespace_id, table_id, count(id) -FROM parquet_file -WHERE compaction_level = 0 and to_delete is null - and sequencer_id = $1 -group by 1, 2, 3, 4 -order by 5 DESC -limit $2; +SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at) +FROM parquet_file +WHERE compaction_level = 0 +AND to_delete IS NULL +AND sequencer_id = $1 +GROUP BY 1, 2, 3, 4 +HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval +ORDER BY 5 DESC +LIMIT $3; "#, ) .bind(&sequencer_id) // $1 - .bind(&num_partitions) // $2 + .bind(&older_than_num_hours) // $2 + .bind(&num_partitions) // $3 .fetch_all(&mut self.inner) .await .map_err(|e| Error::SqlxError { source: e }) diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index dbdbcfa16e..3f30e3e1a3 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -168,10 +168,13 @@ pub async fn create_compactor_server_type( compactor_config.percentage_max_file_size, compactor_config.split_percentage, compactor_config.max_concurrent_size_bytes, + compactor_config.max_cold_concurrent_size_bytes, compactor_config.max_number_partitions_per_sequencer, compactor_config.min_number_recent_ingested_files_per_partition, compactor_config.input_size_threshold_bytes, + compactor_config.cold_input_size_threshold_bytes, compactor_config.input_file_count_threshold, + compactor_config.hot_multiple, ); let compactor_handler = Arc::new(CompactorHandlerImpl::new( sequencers,