From a244e5b0786a776e2b1da874624731be5e0333fa Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 31 Mar 2023 16:13:59 -0400 Subject: [PATCH 1/2] test: Add some tests for CatalogToCompactPartitionsSource's existing behavior --- .../partitions_source/catalog_to_compact.rs | 60 +++++++++++++++++++ iox_catalog/src/mem.rs | 7 +++ iox_tests/src/builders.rs | 9 ++- 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/compactor2/src/components/partitions_source/catalog_to_compact.rs b/compactor2/src/components/partitions_source/catalog_to_compact.rs index 0704822da6..cec56ff7f2 100644 --- a/compactor2/src/components/partitions_source/catalog_to_compact.rs +++ b/compactor2/src/components/partitions_source/catalog_to_compact.rs @@ -57,3 +57,63 @@ impl PartitionsSource for CatalogToCompactPartitionsSource { .expect("retry forever") } } + +#[cfg(test)] +mod tests { + use super::*; + use data_types::Timestamp; + use iox_catalog::mem::MemCatalog; + use iox_tests::PartitionBuilder; + + fn partition_ids(ids: &[i64]) -> Vec { + ids.iter().cloned().map(|id| PartitionId::new(id)).collect() + } + + async fn fetch_test(catalog: Arc, min_threshold: Duration, expected_ids: &[i64]) { + let time_provider = catalog.time_provider(); + + let partitions_source = CatalogToCompactPartitionsSource::new( + Default::default(), + catalog, + min_threshold, + time_provider, + ); + + let mut actual_partition_ids = partitions_source.fetch().await; + actual_partition_ids.sort(); + + assert_eq!( + actual_partition_ids, + partition_ids(expected_ids), + "CatalogToCompact source with min_threshold {min_threshold:?} failed", + ); + } + + #[tokio::test] + async fn no_max_specified() { + let catalog = Arc::new(MemCatalog::new(Default::default())); + let time_provider = catalog.time_provider(); + + let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3)); + let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6)); + + for (id, time) in [(1, time_three_hour_ago), (2, time_six_hour_ago)] + .iter() + .cloned() + { + let partition = PartitionBuilder::new(id as i64) + .with_new_file_at(time) + .build(); + catalog.add_partition(partition).await; + } + + let one_minute = Duration::from_secs(60); + fetch_test(Arc::clone(&catalog), one_minute, &[]).await; + + let four_hours = Duration::from_secs(60 * 60 * 4); + fetch_test(Arc::clone(&catalog), four_hours, &[1]).await; + + let seven_hours = Duration::from_secs(60 * 60 * 7); + fetch_test(Arc::clone(&catalog), seven_hours, &[1, 2]).await; + } +} diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 146858285f..69d3c702be 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -48,6 +48,13 @@ impl MemCatalog { time_provider: Arc::new(SystemProvider::new()), } } + + /// Add partition directly, for testing purposes only as it does not do any consistency or + /// uniqueness checks + pub async fn add_partition(&self, partition: Partition) { + let mut collections = Arc::clone(&self.collections).lock_owned().await; + collections.partitions.push(partition); + } } impl std::fmt::Debug for MemCatalog { diff --git a/iox_tests/src/builders.rs b/iox_tests/src/builders.rs index f840415c84..fdef83ae8b 100644 --- a/iox_tests/src/builders.rs +++ b/iox_tests/src/builders.rs @@ -144,7 +144,7 @@ impl TableBuilder { } #[derive(Debug)] -/// Builds [`Partition`]s for testing +/// Builds [`Partition`]s for testing pub struct PartitionBuilder { partition: Partition, } @@ -165,6 +165,13 @@ impl PartitionBuilder { } } + /// Set the `new_file_at` attribute, without needing to actually create Parquet files for this + /// partition + pub fn with_new_file_at(mut self, time: Timestamp) -> Self { + self.partition.new_file_at = Some(time); + self + } + /// Create the partition pub fn build(self) -> Partition { self.partition From 3199d65c2fe63efe9515171a7544ec9deff97456 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 31 Mar 2023 16:16:17 -0400 Subject: [PATCH 2/2] feat: Add the ability to specify a max threshold duration on CatalogToCompactPartitionsSource --- compactor2/src/components/hardcoded.rs | 1 + .../partitions_source/catalog_to_compact.rs | 84 ++++++++++++++++--- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 1af1150d9c..98c3501782 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -130,6 +130,7 @@ fn make_partitions_source_commit_partition_sink( config.backoff_config.clone(), Arc::clone(&config.catalog), config.partition_threshold, + None, // Recent writes is `partition_threshold` ago to now Arc::clone(&config.time_provider), )) } diff --git a/compactor2/src/components/partitions_source/catalog_to_compact.rs b/compactor2/src/components/partitions_source/catalog_to_compact.rs index cec56ff7f2..e516e08ccc 100644 --- a/compactor2/src/components/partitions_source/catalog_to_compact.rs +++ b/compactor2/src/components/partitions_source/catalog_to_compact.rs @@ -9,11 +9,20 @@ use iox_time::TimeProvider; use super::PartitionsSource; #[derive(Debug)] -/// Returns all partitions that had a new parquet file written more than `threshold` ago. +/// Returns all partitions that had a new Parquet file written after a lower bound of the current +/// time minus `min_threshold` and optionally limited only to those with Parquet files written +/// before the current time minus `max_threshold`. +/// +/// If `max_threshold` is not specified, the upper bound is effectively the current time. +/// +/// If `max_threshold` is specified, it must be less than `min_threshold` so that when computing +/// the range endpoints as `(now - min_threshold, now - max_threshold)`, the lower bound is lower +/// than the upper bound. pub struct CatalogToCompactPartitionsSource { backoff_config: BackoffConfig, catalog: Arc, - threshold: Duration, + min_threshold: Duration, + max_threshold: Option, time_provider: Arc, } @@ -21,13 +30,15 @@ impl CatalogToCompactPartitionsSource { pub fn new( backoff_config: BackoffConfig, catalog: Arc, - threshold: Duration, + min_threshold: Duration, + max_threshold: Option, time_provider: Arc, ) -> Self { Self { backoff_config, catalog, - threshold, + min_threshold, + max_threshold, time_provider, } } @@ -42,7 +53,8 @@ impl Display for CatalogToCompactPartitionsSource { #[async_trait] impl PartitionsSource for CatalogToCompactPartitionsSource { async fn fetch(&self) -> Vec { - let cutoff = self.time_provider.now() - self.threshold; + let minimum_time = self.time_provider.now() - self.min_threshold; + let maximum_time = self.max_threshold.map(|max| self.time_provider.now() - max); Backoff::new(&self.backoff_config) .retry_all_errors("partitions_to_compact", || async { @@ -50,7 +62,7 @@ impl PartitionsSource for CatalogToCompactPartitionsSource { .repositories() .await .partitions() - .partitions_new_file_between(cutoff.into(), None) + .partitions_new_file_between(minimum_time.into(), maximum_time.map(Into::into)) .await }) .await @@ -66,16 +78,22 @@ mod tests { use iox_tests::PartitionBuilder; fn partition_ids(ids: &[i64]) -> Vec { - ids.iter().cloned().map(|id| PartitionId::new(id)).collect() + ids.iter().cloned().map(PartitionId::new).collect() } - async fn fetch_test(catalog: Arc, min_threshold: Duration, expected_ids: &[i64]) { + async fn fetch_test( + catalog: Arc, + min_threshold: Duration, + max_threshold: Option, + expected_ids: &[i64], + ) { let time_provider = catalog.time_provider(); let partitions_source = CatalogToCompactPartitionsSource::new( Default::default(), catalog, min_threshold, + max_threshold, time_provider, ); @@ -85,7 +103,8 @@ mod tests { assert_eq!( actual_partition_ids, partition_ids(expected_ids), - "CatalogToCompact source with min_threshold {min_threshold:?} failed", + "CatalogToCompact source with min_threshold {min_threshold:?} and \ + max_threshold {max_threshold:?} failed", ); } @@ -108,12 +127,53 @@ mod tests { } let one_minute = Duration::from_secs(60); - fetch_test(Arc::clone(&catalog), one_minute, &[]).await; + fetch_test(Arc::clone(&catalog), one_minute, None, &[]).await; let four_hours = Duration::from_secs(60 * 60 * 4); - fetch_test(Arc::clone(&catalog), four_hours, &[1]).await; + fetch_test(Arc::clone(&catalog), four_hours, None, &[1]).await; let seven_hours = Duration::from_secs(60 * 60 * 7); - fetch_test(Arc::clone(&catalog), seven_hours, &[1, 2]).await; + fetch_test(Arc::clone(&catalog), seven_hours, None, &[1, 2]).await; + } + + #[tokio::test] + async fn max_specified() { + let catalog = Arc::new(MemCatalog::new(Default::default())); + let time_provider = catalog.time_provider(); + + let time_now = Timestamp::from(time_provider.now()); + let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3)); + let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6)); + + for (id, time) in [ + (1, time_now), + (2, time_three_hour_ago), + (3, time_six_hour_ago), + ] + .iter() + .cloned() + { + let partition = PartitionBuilder::new(id as i64) + .with_new_file_at(time) + .build(); + catalog.add_partition(partition).await; + } + + let one_minute = Duration::from_secs(60); + let one_hour = Duration::from_secs(60 * 60); + let four_hours = Duration::from_secs(60 * 60 * 4); + let seven_hours = Duration::from_secs(60 * 60 * 7); + + fetch_test(Arc::clone(&catalog), seven_hours, Some(four_hours), &[3]).await; + + fetch_test(Arc::clone(&catalog), seven_hours, Some(one_hour), &[2, 3]).await; + + fetch_test(Arc::clone(&catalog), seven_hours, Some(one_minute), &[2, 3]).await; + + fetch_test(Arc::clone(&catalog), four_hours, Some(one_hour), &[2]).await; + + fetch_test(Arc::clone(&catalog), four_hours, Some(one_minute), &[2]).await; + + fetch_test(Arc::clone(&catalog), one_hour, Some(one_minute), &[]).await; } }