feat: Add the ability to specify a max threshold duration on CatalogToCompactPartitionsSource
parent
a244e5b078
commit
3199d65c2f
|
@ -130,6 +130,7 @@ fn make_partitions_source_commit_partition_sink(
|
|||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
config.partition_threshold,
|
||||
None, // Recent writes is `partition_threshold` ago to now
|
||||
Arc::clone(&config.time_provider),
|
||||
))
|
||||
}
|
||||
|
|
|
@ -9,11 +9,20 @@ use iox_time::TimeProvider;
|
|||
use super::PartitionsSource;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Returns all partitions that had a new parquet file written more than `threshold` ago.
|
||||
/// Returns all partitions that had a new Parquet file written after a lower bound of the current
|
||||
/// time minus `min_threshold` and optionally limited only to those with Parquet files written
|
||||
/// before the current time minus `max_threshold`.
|
||||
///
|
||||
/// If `max_threshold` is not specified, the upper bound is effectively the current time.
|
||||
///
|
||||
/// If `max_threshold` is specified, it must be less than `min_threshold` so that when computing
|
||||
/// the range endpoints as `(now - min_threshold, now - max_threshold)`, the lower bound is lower
|
||||
/// than the upper bound.
|
||||
pub struct CatalogToCompactPartitionsSource {
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
threshold: Duration,
|
||||
min_threshold: Duration,
|
||||
max_threshold: Option<Duration>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
}
|
||||
|
||||
|
@ -21,13 +30,15 @@ impl CatalogToCompactPartitionsSource {
|
|||
pub fn new(
|
||||
backoff_config: BackoffConfig,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
threshold: Duration,
|
||||
min_threshold: Duration,
|
||||
max_threshold: Option<Duration>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
backoff_config,
|
||||
catalog,
|
||||
threshold,
|
||||
min_threshold,
|
||||
max_threshold,
|
||||
time_provider,
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +53,8 @@ impl Display for CatalogToCompactPartitionsSource {
|
|||
#[async_trait]
|
||||
impl PartitionsSource for CatalogToCompactPartitionsSource {
|
||||
async fn fetch(&self) -> Vec<PartitionId> {
|
||||
let cutoff = self.time_provider.now() - self.threshold;
|
||||
let minimum_time = self.time_provider.now() - self.min_threshold;
|
||||
let maximum_time = self.max_threshold.map(|max| self.time_provider.now() - max);
|
||||
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("partitions_to_compact", || async {
|
||||
|
@ -50,7 +62,7 @@ impl PartitionsSource for CatalogToCompactPartitionsSource {
|
|||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.partitions_new_file_between(cutoff.into(), None)
|
||||
.partitions_new_file_between(minimum_time.into(), maximum_time.map(Into::into))
|
||||
.await
|
||||
})
|
||||
.await
|
||||
|
@ -66,16 +78,22 @@ mod tests {
|
|||
use iox_tests::PartitionBuilder;
|
||||
|
||||
fn partition_ids(ids: &[i64]) -> Vec<PartitionId> {
|
||||
ids.iter().cloned().map(|id| PartitionId::new(id)).collect()
|
||||
ids.iter().cloned().map(PartitionId::new).collect()
|
||||
}
|
||||
|
||||
async fn fetch_test(catalog: Arc<MemCatalog>, min_threshold: Duration, expected_ids: &[i64]) {
|
||||
async fn fetch_test(
|
||||
catalog: Arc<MemCatalog>,
|
||||
min_threshold: Duration,
|
||||
max_threshold: Option<Duration>,
|
||||
expected_ids: &[i64],
|
||||
) {
|
||||
let time_provider = catalog.time_provider();
|
||||
|
||||
let partitions_source = CatalogToCompactPartitionsSource::new(
|
||||
Default::default(),
|
||||
catalog,
|
||||
min_threshold,
|
||||
max_threshold,
|
||||
time_provider,
|
||||
);
|
||||
|
||||
|
@ -85,7 +103,8 @@ mod tests {
|
|||
assert_eq!(
|
||||
actual_partition_ids,
|
||||
partition_ids(expected_ids),
|
||||
"CatalogToCompact source with min_threshold {min_threshold:?} failed",
|
||||
"CatalogToCompact source with min_threshold {min_threshold:?} and \
|
||||
max_threshold {max_threshold:?} failed",
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -108,12 +127,53 @@ mod tests {
|
|||
}
|
||||
|
||||
let one_minute = Duration::from_secs(60);
|
||||
fetch_test(Arc::clone(&catalog), one_minute, &[]).await;
|
||||
fetch_test(Arc::clone(&catalog), one_minute, None, &[]).await;
|
||||
|
||||
let four_hours = Duration::from_secs(60 * 60 * 4);
|
||||
fetch_test(Arc::clone(&catalog), four_hours, &[1]).await;
|
||||
fetch_test(Arc::clone(&catalog), four_hours, None, &[1]).await;
|
||||
|
||||
let seven_hours = Duration::from_secs(60 * 60 * 7);
|
||||
fetch_test(Arc::clone(&catalog), seven_hours, &[1, 2]).await;
|
||||
fetch_test(Arc::clone(&catalog), seven_hours, None, &[1, 2]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn max_specified() {
|
||||
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
||||
let time_provider = catalog.time_provider();
|
||||
|
||||
let time_now = Timestamp::from(time_provider.now());
|
||||
let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3));
|
||||
let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6));
|
||||
|
||||
for (id, time) in [
|
||||
(1, time_now),
|
||||
(2, time_three_hour_ago),
|
||||
(3, time_six_hour_ago),
|
||||
]
|
||||
.iter()
|
||||
.cloned()
|
||||
{
|
||||
let partition = PartitionBuilder::new(id as i64)
|
||||
.with_new_file_at(time)
|
||||
.build();
|
||||
catalog.add_partition(partition).await;
|
||||
}
|
||||
|
||||
let one_minute = Duration::from_secs(60);
|
||||
let one_hour = Duration::from_secs(60 * 60);
|
||||
let four_hours = Duration::from_secs(60 * 60 * 4);
|
||||
let seven_hours = Duration::from_secs(60 * 60 * 7);
|
||||
|
||||
fetch_test(Arc::clone(&catalog), seven_hours, Some(four_hours), &[3]).await;
|
||||
|
||||
fetch_test(Arc::clone(&catalog), seven_hours, Some(one_hour), &[2, 3]).await;
|
||||
|
||||
fetch_test(Arc::clone(&catalog), seven_hours, Some(one_minute), &[2, 3]).await;
|
||||
|
||||
fetch_test(Arc::clone(&catalog), four_hours, Some(one_hour), &[2]).await;
|
||||
|
||||
fetch_test(Arc::clone(&catalog), four_hours, Some(one_minute), &[2]).await;
|
||||
|
||||
fetch_test(Arc::clone(&catalog), one_hour, Some(one_minute), &[]).await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue