fix: have warm compaction work with compactor2 (#6571)

* refactor: same function to select partition candidates

* fix: have warm compaction work with compactor2

* fix: format

* chore: cleanup
pull/24376/head
Nga Tran 2023-01-11 21:32:39 -05:00 committed by GitHub
parent 1f508b76fc
commit fa0893819c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 158 additions and 621 deletions

View File

@ -245,6 +245,17 @@ macro_rules! gen_compactor_config {
)]
pub max_parallel_partitions: u64,
/// When select warm partition candidates, partitions with new created files (any level) after
/// this threshold will be considered a candidate. However, only partitions with many contiguous small
/// L1 files will get warm compacted
#[clap(
long = "compaction-warm-partition_candidate-hours-threshold",
env = "INFLUXDB_IOX_COMPACTION_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD",
default_value = "24",
action
)]
pub warm_partition_candidates_hours_threshold: u64,
/// When querying for partitions suitable for warm compaction, this is the
/// upper bound on file size to be counted as "small".
/// Default is half of max_desired_file_size_bytes's default (see above).
@ -300,6 +311,7 @@ impl CompactorOnceConfig {
hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2,
max_parallel_partitions: self.max_parallel_partitions,
warm_partition_candidates_hours_threshold: self.warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes: self.warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count: self.warm_compaction_min_small_file_count,
}

View File

@ -223,6 +223,17 @@ macro_rules! gen_compactor_config {
)]
pub max_parallel_partitions: u64,
/// When select warm partition candidates, partitions with new created files (any level) after
/// this threshold will be considered a candidate. However, only partitions with many contiguous small
/// L1 files will get warm compacted
#[clap(
long = "compaction-warm-partition_candidate-hours-threshold",
env = "INFLUXDB_IOX_COMPACTION_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD",
default_value = "24",
action
)]
pub warm_partition_candidates_hours_threshold: u64,
/// When querying for partitions suitable for warm compaction, this is the
/// upper bound on file size to be counted as "small".
/// Default is half of max_desired_file_size_bytes's default (see above).
@ -275,6 +286,7 @@ impl Compactor2OnceConfig {
hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2,
max_parallel_partitions: self.max_parallel_partitions,
warm_partition_candidates_hours_threshold: self.warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes: self.warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count: self.warm_compaction_min_small_file_count,
}

View File

@ -2,7 +2,7 @@
//! fully compacted.
use crate::{
compact::{Compactor, ShardAssignment},
compact::Compactor,
compact_candidates_with_memory_budget, compact_in_parallel, parquet_file_combining,
parquet_file_lookup::{self, CompactionType},
utils::get_candidates_with_retry,
@ -20,25 +20,8 @@ pub async fn compact(compactor: Arc<Compactor>, do_full_compact: bool) -> usize
// https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and
// simplify this
let max_num_partitions = match &compactor.shards {
ShardAssignment::All => {
debug!(
%compaction_type,
max_num_partitions = compactor.config.max_number_partitions_per_shard,
"Compactor2"
);
compactor.config.max_number_partitions_per_shard
}
ShardAssignment::Only(shards) => {
debug!(
%compaction_type,
num_shards = shards.len(),
max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard,
"Compactor1"
);
compactor.config.max_number_partitions_per_shard * shards.len()
}
};
let max_num_partitions =
compactor.shards.len() * compactor.config.max_number_partitions_per_shard;
let cold_partition_candidates_hours_threshold =
compactor.config.cold_partition_candidates_hours_threshold;
@ -150,6 +133,7 @@ mod tests {
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD: u64 = 10;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
@ -759,6 +743,8 @@ mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 5_000,
warm_compaction_min_small_file_count: 10,
}

View File

@ -756,6 +756,8 @@ pub mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 5_000,
warm_compaction_min_small_file_count: 10,
}

View File

@ -190,6 +190,11 @@ pub struct CompactorConfig {
/// and may also lead to inaccuracy of memory estimation. This number is to cap that.
pub max_parallel_partitions: u64,
/// When select warm partition candidates, partitions with new created files (any level) after
/// this threshold will be considered a candidate. However, only partitions with many contiguous small
/// L1 files will get warm compacted
pub warm_partition_candidates_hours_threshold: u64,
/// Upper bound on file size to be counted as "small" for warm compaction.
pub warm_compaction_small_size_threshold_bytes: i64,

View File

@ -1,10 +1,8 @@
//! Collect highest hot candidates and compact them
use crate::{
compact::{Compactor, ShardAssignment},
compact_candidates_with_memory_budget, compact_in_parallel,
parquet_file_lookup::CompactionType,
utils::get_candidates_with_retry,
compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel,
parquet_file_lookup::CompactionType, utils::get_candidates_with_retry,
};
use data_types::CompactionLevel;
use metric::Attributes;
@ -17,25 +15,8 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
// https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and
// simplify this
let max_num_partitions = match &compactor.shards {
ShardAssignment::All => {
debug!(
%compaction_type,
max_num_partitions = compactor.config.max_number_partitions_per_shard,
"Compactor2"
);
compactor.config.max_number_partitions_per_shard
}
ShardAssignment::Only(shards) => {
debug!(
%compaction_type,
num_shards = shards.len(),
max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard,
"Compactor1"
);
compactor.config.max_number_partitions_per_shard * shards.len()
}
};
let max_num_partitions =
compactor.shards.len() * compactor.config.max_number_partitions_per_shard;
let hour_threshold_1 = compactor.config.hot_compaction_hours_threshold_1;
let hour_threshold_2 = compactor.config.hot_compaction_hours_threshold_2;
@ -106,6 +87,7 @@ mod tests {
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 10;
@ -462,6 +444,8 @@ mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 5_000,
warm_compaction_min_small_file_count: 10,
};

View File

@ -489,6 +489,7 @@ pub mod tests {
const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 100;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
@ -613,6 +614,8 @@ pub mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: max_parallel_jobs,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 50_000_000,
warm_compaction_min_small_file_count: 10,
}
@ -945,6 +948,8 @@ pub mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 5_000,
warm_compaction_min_small_file_count: 10,
};

View File

@ -116,19 +116,34 @@ impl ParquetFilesForCompaction {
return Ok(None);
}
let mut count_small_l1 = 0;
for parquet_file in parquet_files {
// For cold compaction, won't proceed if at least one L0 file created after
// minutes_without_new_writes_to_be_cold
if parquet_file.compaction_level == CompactionLevel::Initial
&& compaction_type == CompactionType::Cold
&& parquet_file.created_at
> Timestamp::from(
compactor
.time_provider
.minutes_ago(minutes_without_new_writes_to_be_cold),
)
{
return Ok(None);
match compaction_type {
CompactionType::Warm => {
// Count number of small L1 files
if parquet_file.compaction_level == CompactionLevel::FileNonOverlapped
&& parquet_file.file_size_bytes
< compactor.config.warm_compaction_small_size_threshold_bytes
{
count_small_l1 += 1;
}
}
CompactionType::Cold => {
// won't proceed if at least one L0 file created after
// minutes_without_new_writes_to_be_cold
if parquet_file.compaction_level == CompactionLevel::Initial
&& compaction_type == CompactionType::Cold
&& parquet_file.created_at
> Timestamp::from(
compactor
.time_provider
.minutes_ago(minutes_without_new_writes_to_be_cold),
)
{
return Ok(None);
}
}
CompactionType::Hot => {}
}
// Estimate the bytes DataFusion needs when scan this file
@ -162,6 +177,22 @@ impl ParquetFilesForCompaction {
return Ok(None);
}
match compaction_type {
CompactionType::Hot => {
// Only do hot compaction if there are L0 files
if level_0.is_empty() {
return Ok(None);
}
}
CompactionType::Warm => {
// Only do warm compaction if there are certain small L1 files
if count_small_l1 < compactor.config.warm_compaction_min_small_file_count {
return Ok(None);
}
}
CompactionType::Cold => {}
}
level_0.sort_by_key(|pf| pf.created_at());
level_1.sort_by_key(|pf| pf.min_time());
@ -186,6 +217,7 @@ mod tests {
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
const DEFAULT_MINUTES_WITHOUT_NEW_WRITES: u64 = 8 * 60;
@ -271,6 +303,8 @@ mod tests {
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes: 5_000,
warm_compaction_min_small_file_count: 10,
}
@ -389,7 +423,7 @@ mod tests {
let parquet_files_for_compaction = ParquetFilesForCompaction::for_partition(
compactor,
partition_with_info,
CompactionType::Hot,
CompactionType::Cold,
)
.await
.unwrap()

View File

@ -1,14 +1,10 @@
//! Select partitions with small, adjacent L1 files and compact them
use crate::{
compact::{self, Compactor, ShardAssignment},
compact_candidates_with_memory_budget, compact_in_parallel,
parquet_file_lookup::CompactionType,
utils::get_candidates_with_retry,
PartitionCompactionCandidateWithInfo,
compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel,
parquet_file_lookup::CompactionType, utils::get_candidates_with_retry,
};
use data_types::{CompactionLevel, PartitionParam, ShardId};
use iox_catalog::interface::Catalog;
use data_types::CompactionLevel;
use metric::Attributes;
use observability_deps::tracing::*;
use std::sync::Arc;
@ -17,10 +13,20 @@ use std::sync::Arc;
pub async fn compact(compactor: Arc<Compactor>) -> usize {
let compaction_type = CompactionType::Warm;
// https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and
// simplify this
let max_num_partitions =
compactor.shards.len() * compactor.config.max_number_partitions_per_shard;
let hours_threshold = compactor.config.warm_partition_candidates_hours_threshold;
let candidates = get_candidates_with_retry(
Arc::clone(&compactor),
compaction_type,
|compactor_for_retry| async move { warm_partitions_to_compact(compactor_for_retry).await },
move |compactor_for_retry| async move {
compactor_for_retry
.partitions_to_compact(compaction_type, vec![hours_threshold], max_num_partitions)
.await
},
)
.await;
@ -60,402 +66,30 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
n_candidates
}
/// Return a list of partitions that have enough small L1 files that make warm compaction
/// worthwhile. This works by counting the number of L1 files that are smaller than
/// `max_desired_file_size_bytes / 2`
pub(crate) async fn warm_partitions_to_compact(
compactor: Arc<Compactor>,
) -> Result<Vec<Arc<PartitionCompactionCandidateWithInfo>>, compact::Error> {
let compaction_type = "warm";
let max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard;
let mut candidates =
Vec::with_capacity(compactor.shards.len() * max_number_partitions_per_shard);
match &compactor.shards {
ShardAssignment::All => {
let mut partitions = warm_partitions_for_shard(
Arc::clone(&compactor.catalog),
None,
compactor.config.warm_compaction_small_size_threshold_bytes,
compactor.config.warm_compaction_min_small_file_count,
max_number_partitions_per_shard,
)
.await?;
// Record metric for candidates per shard
let num_partitions = partitions.len();
debug!(n = num_partitions, compaction_type, "compaction candidates",);
let attributes = Attributes::from([("partition_type", compaction_type.into())]);
let number_gauge = compactor.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
candidates.append(&mut partitions);
}
ShardAssignment::Only(shards) => {
for &shard_id in shards {
let mut partitions = warm_partitions_for_shard(
Arc::clone(&compactor.catalog),
Some(shard_id),
compactor.config.warm_compaction_small_size_threshold_bytes,
compactor.config.warm_compaction_min_small_file_count,
max_number_partitions_per_shard,
)
.await?;
// Record metric for candidates per shard
let num_partitions = partitions.len();
debug!(
shard_id = shard_id.get(),
n = num_partitions,
compaction_type,
"compaction candidates",
);
let attributes = Attributes::from([
("shard_id", format!("{}", shard_id).into()),
("partition_type", compaction_type.into()),
]);
let number_gauge = compactor.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
candidates.append(&mut partitions);
}
}
}
// Get extra needed information for selected partitions
let start_time = compactor.time_provider.now();
// Column types and their counts of the tables of the partition candidates
debug!(
num_candidates=?candidates.len(),
compaction_type,
"start getting column types for the partition candidates"
);
let table_columns = compactor.table_columns(&candidates).await?;
// Add other compaction-needed info into selected partitions
debug!(
num_candidates=?candidates.len(),
compaction_type,
"start getting additional info for the partition candidates"
);
let candidates = compactor
.add_info_to_partitions(&candidates, &table_columns)
.await?;
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(attributes);
duration.record(delta);
}
Ok(candidates)
}
async fn warm_partitions_for_shard(
catalog: Arc<dyn Catalog>,
shard_id: Option<ShardId>,
// Upper bound on file size to be counted as "small" for warm compaction.
small_size_threshold_bytes: i64,
// Minimum number of small files a partition must have to be selected as a candidate for warm
// compaction.
min_small_file_count: usize,
// Max number of partitions per shard we want to read
max_number_partitions_per_shard: usize,
) -> Result<Vec<PartitionParam>, compact::Error> {
let mut repos = catalog.repositories().await;
let partitions = repos
.parquet_files()
.partitions_with_small_l1_file_count(
shard_id,
small_size_threshold_bytes,
min_small_file_count,
max_number_partitions_per_shard,
)
.await
.map_err(|e| compact::Error::PartitionsWithSmallL1Files {
shard_id,
source: e,
})?;
if !partitions.is_empty() {
debug!(
?shard_id,
small_size_threshold_bytes,
min_small_file_count,
n = partitions.len(),
"found some partitions with small L1 files"
);
return Ok(partitions);
}
Ok(Vec::new())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{compact::Compactor, handler::CompactorConfig};
use crate::{
compact::{Compactor, ShardAssignment},
handler::CompactorConfig,
};
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, CompactionLevel, TablePartition, Timestamp};
use futures::{stream::FuturesUnordered, StreamExt};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use iox_time::{SystemProvider, Time, TimeProvider};
use parquet_file::storage::{ParquetStorage, StorageId};
use std::sync::Arc;
struct TestSetup {
catalog: Arc<TestCatalog>,
shard1: Arc<TestShard>,
table1: Arc<TestTable>,
shard2: Arc<TestShard>,
}
async fn test_setup() -> TestSetup {
let catalog = TestCatalog::new();
let namespace = catalog
.create_namespace_1hr_retention("namespace_warm_compaction")
.await;
let shard1 = namespace.create_shard(1).await;
let table1 = namespace.create_table("test_table1").await;
let shard2 = namespace.create_shard(2).await;
TestSetup {
catalog,
shard1,
table1,
shard2,
}
}
#[tokio::test]
async fn no_partitions_no_candidates() {
let TestSetup {
catalog, shard1, ..
} = test_setup().await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
100,
10,
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn no_files_no_candidates() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
table1.with_shard(&shard1).create_partition("one").await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
100,
10,
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn l0_not_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
.with_file_size_bytes(100);
partition1.create_parquet_file_catalog_record(builder).await;
assert_eq!(catalog.count_level_0_files(shard1.shard.id).await, 1);
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
200, // anything bigger than our file will work
1,
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn l2_not_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Final)
.with_file_size_bytes(100);
partition1.create_parquet_file_catalog_record(builder).await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
200, // anything bigger than our file will work
1,
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn only_l1_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
partition1.create_parquet_file_catalog_record(builder).await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
200, // anything bigger than our file will work
1,
1,
)
.await
.unwrap();
assert!(!candidates.is_empty());
}
#[tokio::test]
async fn below_min_count_not_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
partition1.create_parquet_file_catalog_record(builder).await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
200,
2, // min limit is more than we have
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn over_size_threshold_not_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(200);
partition1.create_parquet_file_catalog_record(builder).await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
100, // note, smaller than our file
1,
1,
)
.await
.unwrap();
assert!(candidates.is_empty());
}
#[tokio::test]
async fn mixed_partition_but_warm_and_returned() {
let TestSetup {
catalog,
shard1,
table1,
..
} = test_setup().await;
// create a partition that meets the criteria for being warm
let partition1 = table1.with_shard(&shard1).create_partition("one").await;
// create three small files
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
partition1
.create_parquet_file_catalog_record(builder.clone())
.await;
partition1
.create_parquet_file_catalog_record(builder.clone())
.await;
partition1.create_parquet_file_catalog_record(builder).await;
// create one file that's too big
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(1000);
partition1.create_parquet_file_catalog_record(builder).await;
// create two files that aren't L1
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
.with_file_size_bytes(200);
partition1.create_parquet_file_catalog_record(builder).await;
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Final)
.with_file_size_bytes(200);
partition1.create_parquet_file_catalog_record(builder).await;
// create two more small files, for a total of five now
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
partition1
.create_parquet_file_catalog_record(builder.clone())
.await;
partition1.create_parquet_file_catalog_record(builder).await;
let candidates = warm_partitions_for_shard(
Arc::clone(&catalog.catalog),
Some(shard1.shard.id),
500,
// recall that we created 5 small files above
5,
10,
)
.await
.unwrap();
assert_eq!(candidates.len(), 1);
}
const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 100;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24;
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
const DEFAULT_MINUTES_WITHOUT_NEW_WRITES: u64 = 8 * 60;
const DEFAULT_MIN_ROWS_ALLOCATED: u64 = 100;
fn make_compactor_config(
max_desired_file_size_bytes: u64,
@ -467,185 +101,27 @@ mod tests {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage: 80,
max_number_partitions_per_shard: 1,
max_number_partitions_per_shard: DEFAULT_MAX_NUM_PARTITION_CANDIDATES,
min_number_recent_ingested_files_per_partition: 1,
hot_multiple: 4,
warm_multiple: 1,
memory_budget_bytes: 10 * 1024 * 1024,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: DEFAULT_MIN_ROWS_ALLOCATED,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
cold_partition_candidates_hours_threshold: 24,
hot_compaction_hours_threshold_1: 4,
hot_compaction_hours_threshold_2: 24,
max_parallel_partitions: 20,
minutes_without_new_writes_to_be_cold: DEFAULT_MINUTES_WITHOUT_NEW_WRITES,
cold_partition_candidates_hours_threshold:
DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
warm_partition_candidates_hours_threshold:
DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
}
}
#[tokio::test]
async fn test_warm_partitions_to_compact() {
let TestSetup {
catalog,
shard1,
table1,
shard2,
} = test_setup().await;
// Shard 1: 7 initally empty partitions
let partition1 = table1.with_shard(&shard1).create_partition("waḥid").await;
let partition2 = table1.with_shard(&shard1).create_partition("ʾiṯnān").await;
let partition3 = table1.with_shard(&shard1).create_partition("ṯalāṯah").await;
let partition4 = table1
.with_shard(&shard1)
.create_partition("ʾarbaʿah")
.await;
let partition5 = table1.with_shard(&shard1).create_partition("ḫamsah").await;
let partition6 = table1.with_shard(&shard1).create_partition("sittah").await;
let partition7 = table1
.with_shard(&shard2)
.create_partition_with_sort_key("sabʿah", &["tag1", "time"])
.await;
// Create a compactor
let time_provider = Arc::clone(&catalog.time_provider);
let config = make_compactor_config(10_000, 5_000, 10, 30);
let compactor = Arc::new(Compactor::new(
ShardAssignment::Only(vec![shard1.shard.id, shard2.shard.id]),
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")),
catalog.exec(),
time_provider,
BackoffConfig::default(),
config,
Arc::new(metric::Registry::new()),
));
// This test is an integration test that covers the priority of the candidate selection
// algorithm when there are many files of different kinds across many partitions.
// Warm compaction candidate selection doesn't care about creation time.
// partition1 has a deleted L1, isn't returned
let builder = TestParquetFileBuilder::default()
.with_to_delete(true)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
let _pf1 = partition1.create_parquet_file_catalog_record(builder).await;
// partition2 has a non-L1 file, isn't returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
.with_file_size_bytes(100);
let _pf2 = partition2.create_parquet_file_catalog_record(builder).await;
// partition3 has too few small L1 files (min. is 10), isn't returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
for _n in 1..=9 {
let _pf3 = partition3
.create_parquet_file_catalog_record(builder.clone())
.await;
}
// partition4 has plenty of files but not enough small ones, isn't returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
for _n in 1..=9 {
let _pf4 = partition4
.create_parquet_file_catalog_record(builder.clone())
.await;
}
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(10000); // over the threshold of 5000
for _n in 1..=2 {
let _pf4 = partition4
.create_parquet_file_catalog_record(builder.clone())
.await;
}
// partition5 has plenty of files but not enough small L1s, isn't returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
for _n in 1..=9 {
let _pf5 = partition5
.create_parquet_file_catalog_record(builder.clone())
.await;
}
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
.with_file_size_bytes(100);
for _n in 1..=2 {
let _pf5 = partition5
.create_parquet_file_catalog_record(builder.clone())
.await;
}
// partition6 has many small L1 files, is returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
for _n in 1..=10 {
let _pf6 = partition6
.create_parquet_file_catalog_record(builder.clone())
.await;
}
// partition7 has many small L1 files, plus some other things, is returned
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100);
for _n in 1..=10 {
let _pf7 = partition7
.create_parquet_file_catalog_record(builder.clone())
.await;
}
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
.with_file_size_bytes(100);
for _n in 1..=2 {
let _pf7 = partition7
.create_parquet_file_catalog_record(builder.clone())
.await;
}
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Final)
.with_file_size_bytes(100);
for _n in 1..=2 {
let _pf7 = partition7
.create_parquet_file_catalog_record(builder.clone())
.await;
}
let builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Final)
.with_file_size_bytes(10000);
for _n in 1..=2 {
let _pf7 = partition7
.create_parquet_file_catalog_record(builder.clone())
.await;
}
// Will have 2 candidates, one for each shard
let mut candidates = warm_partitions_to_compact(Arc::clone(&compactor))
.await
.unwrap();
candidates.sort_by_key(|c| c.candidate);
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].id(), partition6.partition.id);
// this sort key is None
assert_eq!(candidates[0].sort_key, partition6.partition.sort_key());
assert_eq!(candidates[1].id(), partition7.partition.id);
// this sort key is Some(tag1, time)
assert_eq!(candidates[1].sort_key, partition7.partition.sort_key());
}
struct TestFileToCreate {
lp: String,
max_seq: i64,
@ -769,7 +245,11 @@ mod tests {
// there will only be 1 because all files in the same partition
let compactor = Arc::new(compactor);
let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor))
let compaction_type = CompactionType::Warm;
let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold;
let max_num_partitions = compactor.config.max_number_partitions_per_shard;
let partition_candidates = compactor
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
.await
.unwrap();
assert_eq!(partition_candidates.len(), 1);
@ -967,7 +447,11 @@ mod tests {
// there will only be 1 because all files in the same partition
let compactor = Arc::new(compactor);
let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor))
let compaction_type = CompactionType::Warm;
let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold;
let max_num_partitions = compactor.config.max_number_partitions_per_shard;
let partition_candidates = compactor
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
.await
.unwrap();
assert_eq!(partition_candidates.len(), 1);
@ -1160,7 +644,11 @@ mod tests {
// there will only be 1 because all files in the same partition
let compactor = Arc::new(compactor);
let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor))
let compaction_type = CompactionType::Warm;
let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold;
let max_num_partitions = compactor.config.max_number_partitions_per_shard;
let partition_candidates = compactor
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
.await
.unwrap();
assert_eq!(partition_candidates.len(), 1);
@ -1340,7 +828,11 @@ mod tests {
// there will only be 1 because all files in the same partition
let compactor = Arc::new(compactor);
let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor))
let compaction_type = CompactionType::Warm;
let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold;
let max_num_partitions = compactor.config.max_number_partitions_per_shard;
let partition_candidates = compactor
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
.await
.unwrap();
assert_eq!(partition_candidates.len(), 1);

View File

@ -461,6 +461,7 @@ impl Config {
hot_compaction_hours_threshold_1: 4,
hot_compaction_hours_threshold_2: 24,
max_parallel_partitions: 20,
warm_partition_candidates_hours_threshold: 24,
warm_compaction_small_size_threshold_bytes: 15_000,
warm_compaction_min_small_file_count: 10,
};

View File

@ -213,6 +213,7 @@ pub async fn build_compactor2_from_config(
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
} = compactor_config;
@ -234,6 +235,7 @@ pub async fn build_compactor2_from_config(
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
};
@ -358,6 +360,7 @@ pub async fn build_compactor_from_config(
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
..
@ -380,6 +383,7 @@ pub async fn build_compactor_from_config(
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
warm_partition_candidates_hours_threshold,
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
};