From 3199d65c2fe63efe9515171a7544ec9deff97456 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 31 Mar 2023 16:16:17 -0400 Subject: [PATCH] feat: Add the ability to specify a max threshold duration on CatalogToCompactPartitionsSource --- compactor2/src/components/hardcoded.rs | 1 + .../partitions_source/catalog_to_compact.rs | 84 ++++++++++++++++--- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 1af1150d9c..98c3501782 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -130,6 +130,7 @@ fn make_partitions_source_commit_partition_sink( config.backoff_config.clone(), Arc::clone(&config.catalog), config.partition_threshold, + None, // Recent writes is `partition_threshold` ago to now Arc::clone(&config.time_provider), )) } diff --git a/compactor2/src/components/partitions_source/catalog_to_compact.rs b/compactor2/src/components/partitions_source/catalog_to_compact.rs index cec56ff7f2..e516e08ccc 100644 --- a/compactor2/src/components/partitions_source/catalog_to_compact.rs +++ b/compactor2/src/components/partitions_source/catalog_to_compact.rs @@ -9,11 +9,20 @@ use iox_time::TimeProvider; use super::PartitionsSource; #[derive(Debug)] -/// Returns all partitions that had a new parquet file written more than `threshold` ago. +/// Returns all partitions that had a new Parquet file written after a lower bound of the current +/// time minus `min_threshold` and optionally limited only to those with Parquet files written +/// before the current time minus `max_threshold`. +/// +/// If `max_threshold` is not specified, the upper bound is effectively the current time. +/// +/// If `max_threshold` is specified, it must be less than `min_threshold` so that when computing +/// the range endpoints as `(now - min_threshold, now - max_threshold)`, the lower bound is lower +/// than the upper bound. pub struct CatalogToCompactPartitionsSource { backoff_config: BackoffConfig, catalog: Arc, - threshold: Duration, + min_threshold: Duration, + max_threshold: Option, time_provider: Arc, } @@ -21,13 +30,15 @@ impl CatalogToCompactPartitionsSource { pub fn new( backoff_config: BackoffConfig, catalog: Arc, - threshold: Duration, + min_threshold: Duration, + max_threshold: Option, time_provider: Arc, ) -> Self { Self { backoff_config, catalog, - threshold, + min_threshold, + max_threshold, time_provider, } } @@ -42,7 +53,8 @@ impl Display for CatalogToCompactPartitionsSource { #[async_trait] impl PartitionsSource for CatalogToCompactPartitionsSource { async fn fetch(&self) -> Vec { - let cutoff = self.time_provider.now() - self.threshold; + let minimum_time = self.time_provider.now() - self.min_threshold; + let maximum_time = self.max_threshold.map(|max| self.time_provider.now() - max); Backoff::new(&self.backoff_config) .retry_all_errors("partitions_to_compact", || async { @@ -50,7 +62,7 @@ impl PartitionsSource for CatalogToCompactPartitionsSource { .repositories() .await .partitions() - .partitions_new_file_between(cutoff.into(), None) + .partitions_new_file_between(minimum_time.into(), maximum_time.map(Into::into)) .await }) .await @@ -66,16 +78,22 @@ mod tests { use iox_tests::PartitionBuilder; fn partition_ids(ids: &[i64]) -> Vec { - ids.iter().cloned().map(|id| PartitionId::new(id)).collect() + ids.iter().cloned().map(PartitionId::new).collect() } - async fn fetch_test(catalog: Arc, min_threshold: Duration, expected_ids: &[i64]) { + async fn fetch_test( + catalog: Arc, + min_threshold: Duration, + max_threshold: Option, + expected_ids: &[i64], + ) { let time_provider = catalog.time_provider(); let partitions_source = CatalogToCompactPartitionsSource::new( Default::default(), catalog, min_threshold, + max_threshold, time_provider, ); @@ -85,7 +103,8 @@ mod tests { assert_eq!( actual_partition_ids, partition_ids(expected_ids), - "CatalogToCompact source with min_threshold {min_threshold:?} failed", + "CatalogToCompact source with min_threshold {min_threshold:?} and \ + max_threshold {max_threshold:?} failed", ); } @@ -108,12 +127,53 @@ mod tests { } let one_minute = Duration::from_secs(60); - fetch_test(Arc::clone(&catalog), one_minute, &[]).await; + fetch_test(Arc::clone(&catalog), one_minute, None, &[]).await; let four_hours = Duration::from_secs(60 * 60 * 4); - fetch_test(Arc::clone(&catalog), four_hours, &[1]).await; + fetch_test(Arc::clone(&catalog), four_hours, None, &[1]).await; let seven_hours = Duration::from_secs(60 * 60 * 7); - fetch_test(Arc::clone(&catalog), seven_hours, &[1, 2]).await; + fetch_test(Arc::clone(&catalog), seven_hours, None, &[1, 2]).await; + } + + #[tokio::test] + async fn max_specified() { + let catalog = Arc::new(MemCatalog::new(Default::default())); + let time_provider = catalog.time_provider(); + + let time_now = Timestamp::from(time_provider.now()); + let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3)); + let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6)); + + for (id, time) in [ + (1, time_now), + (2, time_three_hour_ago), + (3, time_six_hour_ago), + ] + .iter() + .cloned() + { + let partition = PartitionBuilder::new(id as i64) + .with_new_file_at(time) + .build(); + catalog.add_partition(partition).await; + } + + let one_minute = Duration::from_secs(60); + let one_hour = Duration::from_secs(60 * 60); + let four_hours = Duration::from_secs(60 * 60 * 4); + let seven_hours = Duration::from_secs(60 * 60 * 7); + + fetch_test(Arc::clone(&catalog), seven_hours, Some(four_hours), &[3]).await; + + fetch_test(Arc::clone(&catalog), seven_hours, Some(one_hour), &[2, 3]).await; + + fetch_test(Arc::clone(&catalog), seven_hours, Some(one_minute), &[2, 3]).await; + + fetch_test(Arc::clone(&catalog), four_hours, Some(one_hour), &[2]).await; + + fetch_test(Arc::clone(&catalog), four_hours, Some(one_minute), &[2]).await; + + fetch_test(Arc::clone(&catalog), one_hour, Some(one_minute), &[]).await; } }