diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 0ca6d09b49..2bf1f6e876 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -918,7 +918,7 @@ pub mod tests { let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap(); txn.parquet_files().flag_for_delete(pf1.id).await.unwrap(); // - // partition2 has a cold L2 file created recently (default 9 hours ago) --> a cold candidate + // partition2 has only a L2 file created recently (default 9 hours ago) --> not a candidate let p2 = ParquetFileParams { object_store_id: Uuid::new_v4(), partition_id: partition2.id, @@ -927,23 +927,13 @@ pub mod tests { }; let _pf2 = txn.parquet_files().create(p2).await.unwrap(); txn.commit().await.unwrap(); - // 1 candidate becasue we limit it - let candidates = compactor - .partitions_to_compact(compaction_type, vec![hour_threshold], 1) - .await - .unwrap(); - assert_eq!(candidates.len(), 1); - // 2 candidates because we do not limit it + // only partition 1 is returned as a candidate let candidates = compactor .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await .unwrap(); - assert_eq!(candidates.len(), 2); - // sort candidates on partition_id - let mut candidates = candidates.into_iter().collect::>(); - candidates.sort_by_key(|c| c.candidate.partition_id); + assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].candidate.partition_id, partition1.id); - assert_eq!(candidates[1].candidate.partition_id, partition2.id); // // verify no candidates will actualy get cold compaction because they are not old enough let files = parquet_file_lookup::ParquetFilesForCompaction::for_partition( @@ -954,15 +944,6 @@ pub mod tests { .await .unwrap(); assert!(files.is_none()); - // - let files = parquet_file_lookup::ParquetFilesForCompaction::for_partition( - Arc::clone(&compactor), - Arc::clone(&candidates[1]), - CompactionType::Cold, - ) - .await - .unwrap(); - assert!(files.is_none()); // -------------------------------------- // Case 3: no new recent writes (within the last 8 hours) --> return that partition @@ -977,7 +958,7 @@ pub mod tests { let _pf3 = txn.parquet_files().create(p3).await.unwrap(); txn.commit().await.unwrap(); // - // Still return 2 candidates + // Return 2 candidates let candidates = compactor .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await @@ -1208,5 +1189,15 @@ pub mod tests { assert_eq!(candidates[3].candidate.partition_id, partition4.id); assert_eq!(candidates[4].candidate.partition_id, partition5.id); assert_eq!(candidates[5].candidate.partition_id, another_partition.id); + + // -------------------------------------- + // Test limit of number of partitions to compact + let candidates = compactor + .partitions_to_compact(compaction_type, vec![hour_threshold], 3) + .await + .unwrap(); + // Since we do not prioritize partitions (for now), only need to check the number of candidates + // We are testing and the goal is to keep this limit high and do not have to think about prioritization + assert_eq!(candidates.len(), 3); } } diff --git a/iox_catalog/migrations/20230113075522_modify_trigger_new_file_at.sql b/iox_catalog/migrations/20230113075522_modify_trigger_new_file_at.sql new file mode 100644 index 0000000000..caa8472ff6 --- /dev/null +++ b/iox_catalog/migrations/20230113075522_modify_trigger_new_file_at.sql @@ -0,0 +1,16 @@ +-- FUNTION that updates the new_file_at field in the partition table when the update_partition trigger is fired +-- The field new_file_at signals when the last file was added to the partition for compaction. However, +-- since compaction level 2 is the final stage, its creation should not signal further compaction + +CREATE OR REPLACE FUNCTION update_partition_on_new_file_at() +RETURNS TRIGGER +LANGUAGE PLPGSQL +AS $$ +BEGIN + IF NEW.compaction_level < 2 THEN + UPDATE partition SET new_file_at = NEW.created_at WHERE id = NEW.partition_id; + END IF; + + RETURN NEW; +END; +$$; \ No newline at end of file diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 4dcf6acd09..cbfaaf3379 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -634,15 +634,6 @@ pub trait ParquetFileRepo: Send + Sync { max_time: Timestamp, ) -> Result>; - // Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518 - /// Select partition for cold/warm/hot compaction - /// These are partitions with files created recently (aka created after the specified time_in_the_past) - /// These files include all levels of compaction files and both non-deleted and soft-deleted files - async fn partitions_with_recent_created_files( - &mut self, - time_in_the_past: Timestamp, - ) -> Result>; - // Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518 /// List the most recent highest throughput partition for a given shard, if specified async fn recent_highest_throughput_partitions( @@ -3837,12 +3828,6 @@ pub(crate) mod test_helpers { let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5)); // Db has no partition - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); - assert!(partitions.is_empty()); // get from partition table let partitions = repos .partitions() @@ -3859,12 +3844,6 @@ pub(crate) mod test_helpers { .create_or_get("one".into(), shard.id, table.id) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); - assert!(partitions.is_empty()); // get from partition table let partitions = repos .partitions() @@ -3902,13 +3881,6 @@ pub(crate) mod test_helpers { .flag_for_delete(delete_l0_file.id) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); - // still empty becasue the file was not recently created - assert!(partitions.is_empty()); // get from partition table let partitions = repos .partitions() @@ -3928,14 +3900,7 @@ pub(crate) mod test_helpers { .create(l0_one_hour_ago_file_params.clone()) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // partition one should be returned - assert_eq!(partitions.len(), 1); - assert!(partitions.contains(&partition1.id)); // get from partition table let partitions = repos .partitions() @@ -3953,14 +3918,7 @@ pub(crate) mod test_helpers { .create_or_get("two".into(), shard.id, table.id) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // should return partittion one only - assert_eq!(partitions.len(), 1); - assert!(partitions.contains(&partition1.id)); // get from partition table let partitions = repos .partitions() @@ -3982,14 +3940,7 @@ pub(crate) mod test_helpers { .create(l0_five_hour_ago_file_params.clone()) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // still return partittione one only - assert_eq!(partitions.len(), 1); - assert!(partitions.contains(&partition1.id)); // get from partition table let partitions = repos .partitions() @@ -4012,15 +3963,7 @@ pub(crate) mod test_helpers { .create(l1_file_params.clone()) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // should return both partitions - assert_eq!(partitions.len(), 2); - assert!(partitions.contains(&partition1.id)); - assert!(partitions.contains(&partition2.id)); // get from partition table let partitions = repos .partitions() @@ -4042,15 +3985,7 @@ pub(crate) mod test_helpers { .create_or_get("three".into(), shard.id, table.id) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // should return partittion one and two only - assert_eq!(partitions.len(), 2); - assert!(partitions.contains(&partition1.id)); - assert!(partitions.contains(&partition2.id)); // get from partition table let partitions = repos .partitions() @@ -4064,7 +3999,35 @@ pub(crate) mod test_helpers { assert_eq!(partitions[0].partition_id, partition1.id); assert_eq!(partitions[1].partition_id, partition2.id); - // add an L0 file created recently (one hour ago) + // Add a L2 created recently (just now) for partition three + // Since it is L2, the partition won't get updated + let l2_file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + created_at: time_now, + partition_id: partition3.id, + compaction_level: CompactionLevel::Final, + ..parquet_file_params.clone() + }; + repos + .parquet_files() + .create(l2_file_params.clone()) + .await + .unwrap(); + // still should return partittion one and two only + // get from partition table + let partitions = repos + .partitions() + .partitions_with_recent_created_files(time_two_hour_ago, max_num_partition) + .await + .unwrap(); + assert_eq!(partitions.len(), 2); + // sort by partition id + let mut partitions = partitions; + partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id)); + assert_eq!(partitions[0].partition_id, partition1.id); + assert_eq!(partitions[1].partition_id, partition2.id); + + // add an L0 file created recently (one hour ago) for partition three let l0_one_hour_ago_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_one_hour_ago, @@ -4076,16 +4039,7 @@ pub(crate) mod test_helpers { .create(l0_one_hour_ago_file_params.clone()) .await .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_recent_created_files(time_two_hour_ago) - .await - .unwrap(); // should return all partitions - assert_eq!(partitions.len(), 3); - assert!(partitions.contains(&partition1.id)); - assert!(partitions.contains(&partition2.id)); - assert!(partitions.contains(&partition3.id)); // get from partition table let partitions = repos .partitions() diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index b772fdb447..01e454df1c 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1195,15 +1195,19 @@ impl ParquetFileRepo for MemTxn { created_at, column_set, }; + let compaction_level = parquet_file.compaction_level; stage.parquet_files.push(parquet_file); // Update the new_file_at field its partition to the time of created_at - let partition = stage - .partitions - .iter_mut() - .find(|p| p.id == partition_id) - .ok_or(Error::PartitionNotFound { id: partition_id })?; - partition.new_file_at = Some(created_at); + // Only update if the compaction level is not Final which signal more compaction needed + if compaction_level < CompactionLevel::Final { + let partition = stage + .partitions + .iter_mut() + .find(|p| p.id == partition_id) + .ok_or(Error::PartitionNotFound { id: partition_id })?; + partition.new_file_at = Some(created_at); + } Ok(stage.parquet_files.last().unwrap().clone()) } @@ -1359,20 +1363,6 @@ impl ParquetFileRepo for MemTxn { .cloned() .collect()) } - async fn partitions_with_recent_created_files( - &mut self, - time_in_the_past: Timestamp, - ) -> Result> { - let stage = self.stage(); - - let partitions: Vec<_> = stage - .parquet_files - .iter() - .filter(|f| f.created_at > time_in_the_past) - .map(|f| f.partition_id) - .collect(); - Ok(partitions) - } async fn recent_highest_throughput_partitions( &mut self, diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 3251f60bfb..f5da437b1e 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -289,7 +289,6 @@ decorate!( "parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result; "parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result; "parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result>; - "partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp) -> Result>; "recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result>; "parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result>; "most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option, time_in_the_past: Timestamp, num_partitions: usize) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 7364394815..a0e34492f6 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1897,23 +1897,6 @@ WHERE parquet_file.shard_id = $1 .map_err(|e| Error::SqlxError { source: e }) } - async fn partitions_with_recent_created_files( - &mut self, - time_in_the_past: Timestamp, - ) -> Result> { - sqlx::query_as::<_, PartitionId>( - r#" - SELECT distinct partition_id - FROM parquet_file - WHERE created_at > $1; - "#, - ) - .bind(time_in_the_past) // $1 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn recent_highest_throughput_partitions( &mut self, shard_id: Option,