Merge pull request #7417 from influxdata/cn/cold-compaction-selection
feat: Add the ability to specify a max threshold duration on CatalogToCompactPartitionsSourcepull/24376/head
commit
ffe8714956
|
@ -130,6 +130,7 @@ fn make_partitions_source_commit_partition_sink(
|
||||||
config.backoff_config.clone(),
|
config.backoff_config.clone(),
|
||||||
Arc::clone(&config.catalog),
|
Arc::clone(&config.catalog),
|
||||||
config.partition_threshold,
|
config.partition_threshold,
|
||||||
|
None, // Recent writes is `partition_threshold` ago to now
|
||||||
Arc::clone(&config.time_provider),
|
Arc::clone(&config.time_provider),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,11 +9,20 @@ use iox_time::TimeProvider;
|
||||||
use super::PartitionsSource;
|
use super::PartitionsSource;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Returns all partitions that had a new parquet file written more than `threshold` ago.
|
/// Returns all partitions that had a new Parquet file written after a lower bound of the current
|
||||||
|
/// time minus `min_threshold` and optionally limited only to those with Parquet files written
|
||||||
|
/// before the current time minus `max_threshold`.
|
||||||
|
///
|
||||||
|
/// If `max_threshold` is not specified, the upper bound is effectively the current time.
|
||||||
|
///
|
||||||
|
/// If `max_threshold` is specified, it must be less than `min_threshold` so that when computing
|
||||||
|
/// the range endpoints as `(now - min_threshold, now - max_threshold)`, the lower bound is lower
|
||||||
|
/// than the upper bound.
|
||||||
pub struct CatalogToCompactPartitionsSource {
|
pub struct CatalogToCompactPartitionsSource {
|
||||||
backoff_config: BackoffConfig,
|
backoff_config: BackoffConfig,
|
||||||
catalog: Arc<dyn Catalog>,
|
catalog: Arc<dyn Catalog>,
|
||||||
threshold: Duration,
|
min_threshold: Duration,
|
||||||
|
max_threshold: Option<Duration>,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,13 +30,15 @@ impl CatalogToCompactPartitionsSource {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
backoff_config: BackoffConfig,
|
backoff_config: BackoffConfig,
|
||||||
catalog: Arc<dyn Catalog>,
|
catalog: Arc<dyn Catalog>,
|
||||||
threshold: Duration,
|
min_threshold: Duration,
|
||||||
|
max_threshold: Option<Duration>,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
backoff_config,
|
backoff_config,
|
||||||
catalog,
|
catalog,
|
||||||
threshold,
|
min_threshold,
|
||||||
|
max_threshold,
|
||||||
time_provider,
|
time_provider,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +53,8 @@ impl Display for CatalogToCompactPartitionsSource {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl PartitionsSource for CatalogToCompactPartitionsSource {
|
impl PartitionsSource for CatalogToCompactPartitionsSource {
|
||||||
async fn fetch(&self) -> Vec<PartitionId> {
|
async fn fetch(&self) -> Vec<PartitionId> {
|
||||||
let cutoff = self.time_provider.now() - self.threshold;
|
let minimum_time = self.time_provider.now() - self.min_threshold;
|
||||||
|
let maximum_time = self.max_threshold.map(|max| self.time_provider.now() - max);
|
||||||
|
|
||||||
Backoff::new(&self.backoff_config)
|
Backoff::new(&self.backoff_config)
|
||||||
.retry_all_errors("partitions_to_compact", || async {
|
.retry_all_errors("partitions_to_compact", || async {
|
||||||
|
@ -50,10 +62,118 @@ impl PartitionsSource for CatalogToCompactPartitionsSource {
|
||||||
.repositories()
|
.repositories()
|
||||||
.await
|
.await
|
||||||
.partitions()
|
.partitions()
|
||||||
.partitions_new_file_between(cutoff.into(), None)
|
.partitions_new_file_between(minimum_time.into(), maximum_time.map(Into::into))
|
||||||
.await
|
.await
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.expect("retry forever")
|
.expect("retry forever")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use data_types::Timestamp;
|
||||||
|
use iox_catalog::mem::MemCatalog;
|
||||||
|
use iox_tests::PartitionBuilder;
|
||||||
|
|
||||||
|
fn partition_ids(ids: &[i64]) -> Vec<PartitionId> {
|
||||||
|
ids.iter().cloned().map(PartitionId::new).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_test(
|
||||||
|
catalog: Arc<MemCatalog>,
|
||||||
|
min_threshold: Duration,
|
||||||
|
max_threshold: Option<Duration>,
|
||||||
|
expected_ids: &[i64],
|
||||||
|
) {
|
||||||
|
let time_provider = catalog.time_provider();
|
||||||
|
|
||||||
|
let partitions_source = CatalogToCompactPartitionsSource::new(
|
||||||
|
Default::default(),
|
||||||
|
catalog,
|
||||||
|
min_threshold,
|
||||||
|
max_threshold,
|
||||||
|
time_provider,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut actual_partition_ids = partitions_source.fetch().await;
|
||||||
|
actual_partition_ids.sort();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
actual_partition_ids,
|
||||||
|
partition_ids(expected_ids),
|
||||||
|
"CatalogToCompact source with min_threshold {min_threshold:?} and \
|
||||||
|
max_threshold {max_threshold:?} failed",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn no_max_specified() {
|
||||||
|
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
||||||
|
let time_provider = catalog.time_provider();
|
||||||
|
|
||||||
|
let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3));
|
||||||
|
let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6));
|
||||||
|
|
||||||
|
for (id, time) in [(1, time_three_hour_ago), (2, time_six_hour_ago)]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
{
|
||||||
|
let partition = PartitionBuilder::new(id as i64)
|
||||||
|
.with_new_file_at(time)
|
||||||
|
.build();
|
||||||
|
catalog.add_partition(partition).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let one_minute = Duration::from_secs(60);
|
||||||
|
fetch_test(Arc::clone(&catalog), one_minute, None, &[]).await;
|
||||||
|
|
||||||
|
let four_hours = Duration::from_secs(60 * 60 * 4);
|
||||||
|
fetch_test(Arc::clone(&catalog), four_hours, None, &[1]).await;
|
||||||
|
|
||||||
|
let seven_hours = Duration::from_secs(60 * 60 * 7);
|
||||||
|
fetch_test(Arc::clone(&catalog), seven_hours, None, &[1, 2]).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn max_specified() {
|
||||||
|
let catalog = Arc::new(MemCatalog::new(Default::default()));
|
||||||
|
let time_provider = catalog.time_provider();
|
||||||
|
|
||||||
|
let time_now = Timestamp::from(time_provider.now());
|
||||||
|
let time_three_hour_ago = Timestamp::from(time_provider.hours_ago(3));
|
||||||
|
let time_six_hour_ago = Timestamp::from(time_provider.hours_ago(6));
|
||||||
|
|
||||||
|
for (id, time) in [
|
||||||
|
(1, time_now),
|
||||||
|
(2, time_three_hour_ago),
|
||||||
|
(3, time_six_hour_ago),
|
||||||
|
]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
{
|
||||||
|
let partition = PartitionBuilder::new(id as i64)
|
||||||
|
.with_new_file_at(time)
|
||||||
|
.build();
|
||||||
|
catalog.add_partition(partition).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let one_minute = Duration::from_secs(60);
|
||||||
|
let one_hour = Duration::from_secs(60 * 60);
|
||||||
|
let four_hours = Duration::from_secs(60 * 60 * 4);
|
||||||
|
let seven_hours = Duration::from_secs(60 * 60 * 7);
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), seven_hours, Some(four_hours), &[3]).await;
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), seven_hours, Some(one_hour), &[2, 3]).await;
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), seven_hours, Some(one_minute), &[2, 3]).await;
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), four_hours, Some(one_hour), &[2]).await;
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), four_hours, Some(one_minute), &[2]).await;
|
||||||
|
|
||||||
|
fetch_test(Arc::clone(&catalog), one_hour, Some(one_minute), &[]).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -48,6 +48,13 @@ impl MemCatalog {
|
||||||
time_provider: Arc::new(SystemProvider::new()),
|
time_provider: Arc::new(SystemProvider::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add partition directly, for testing purposes only as it does not do any consistency or
|
||||||
|
/// uniqueness checks
|
||||||
|
pub async fn add_partition(&self, partition: Partition) {
|
||||||
|
let mut collections = Arc::clone(&self.collections).lock_owned().await;
|
||||||
|
collections.partitions.push(partition);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for MemCatalog {
|
impl std::fmt::Debug for MemCatalog {
|
||||||
|
|
|
@ -144,7 +144,7 @@ impl TableBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Builds [`Partition`]s for testing
|
/// Builds [`Partition`]s for testing
|
||||||
pub struct PartitionBuilder {
|
pub struct PartitionBuilder {
|
||||||
partition: Partition,
|
partition: Partition,
|
||||||
}
|
}
|
||||||
|
@ -165,6 +165,13 @@ impl PartitionBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the `new_file_at` attribute, without needing to actually create Parquet files for this
|
||||||
|
/// partition
|
||||||
|
pub fn with_new_file_at(mut self, time: Timestamp) -> Self {
|
||||||
|
self.partition.new_file_at = Some(time);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Create the partition
|
/// Create the partition
|
||||||
pub fn build(self) -> Partition {
|
pub fn build(self) -> Partition {
|
||||||
self.partition
|
self.partition
|
||||||
|
|
Loading…
Reference in New Issue