|
|
|
@ -238,18 +238,22 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
// 4. Almost hitting max budget (only 10% left)
|
|
|
|
|
// OR no more candidates
|
|
|
|
|
// OR already considered all remaining candidates.
|
|
|
|
|
// 4. Let compact the candidates that are in parallel_compacting_candidates if one of this condition hits:
|
|
|
|
|
// . candidates in parallel_compacting_candidates consume almost all the budget
|
|
|
|
|
// . no more candidates
|
|
|
|
|
// . already considered all remaining candidates.
|
|
|
|
|
// . hit the max number of partitions to compact in parallel
|
|
|
|
|
if (!parallel_compacting_candidates.is_empty())
|
|
|
|
|
&& ((remaining_budget_bytes <= (compactor.config.memory_budget_bytes / 10) as u64)
|
|
|
|
|
|| (candidates.is_empty())
|
|
|
|
|
|| (count == num_remaining_candidates))
|
|
|
|
|
|| (count == num_remaining_candidates)
|
|
|
|
|
|| (count as u64 == compactor.config.max_parallel_partitions))
|
|
|
|
|
{
|
|
|
|
|
debug!(
|
|
|
|
|
num_parallel_compacting_candidates = parallel_compacting_candidates.len(),
|
|
|
|
|
total_needed_memory_budget_bytes =
|
|
|
|
|
compactor.config.memory_budget_bytes - remaining_budget_bytes,
|
|
|
|
|
config_max_parallel_partitions = compactor.config.max_parallel_partitions,
|
|
|
|
|
compaction_type,
|
|
|
|
|
"parallel compacting candidate"
|
|
|
|
|
);
|
|
|
|
@ -452,7 +456,9 @@ pub mod tests {
|
|
|
|
|
use arrow_util::assert_batches_sorted_eq;
|
|
|
|
|
use backoff::BackoffConfig;
|
|
|
|
|
use data_types::{ColumnType, CompactionLevel, ParquetFileId};
|
|
|
|
|
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
|
|
|
|
|
use iox_tests::util::{
|
|
|
|
|
TestCatalog, TestParquetFileBuilder, TestPartition, TestShard, TestTable,
|
|
|
|
|
};
|
|
|
|
|
use iox_time::{SystemProvider, TimeProvider};
|
|
|
|
|
use std::{
|
|
|
|
|
collections::{HashMap, VecDeque},
|
|
|
|
@ -462,6 +468,7 @@ pub mod tests {
|
|
|
|
|
|
|
|
|
|
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4;
|
|
|
|
|
const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24;
|
|
|
|
|
const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20;
|
|
|
|
|
|
|
|
|
|
// In tests that are verifying successful compaction not affected by the memory budget, this
|
|
|
|
|
// converts a `parquet_file_filtering::FilteredFiles` that has a `filter_result` of
|
|
|
|
@ -500,7 +507,7 @@ pub mod tests {
|
|
|
|
|
compactor,
|
|
|
|
|
mock_compactor,
|
|
|
|
|
..
|
|
|
|
|
} = test_setup(14350).await;
|
|
|
|
|
} = test_setup(14350, 20).await;
|
|
|
|
|
|
|
|
|
|
let sorted_candidates = VecDeque::new();
|
|
|
|
|
|
|
|
|
@ -563,7 +570,7 @@ pub mod tests {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn make_compactor_config(budget: u64) -> CompactorConfig {
|
|
|
|
|
fn make_compactor_config(budget: u64, max_parallel_jobs: u64) -> CompactorConfig {
|
|
|
|
|
// All numbers in here are chosen carefully for many tests.
|
|
|
|
|
// Change them will break the tests
|
|
|
|
|
CompactorConfig {
|
|
|
|
@ -580,6 +587,7 @@ pub mod tests {
|
|
|
|
|
minutes_without_new_writes_to_be_cold: 10,
|
|
|
|
|
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
|
|
|
|
|
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
|
|
|
|
|
max_parallel_partitions: max_parallel_jobs,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -591,10 +599,10 @@ pub mod tests {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn test_setup_with_default_budget() -> TestSetup {
|
|
|
|
|
test_setup(14350).await
|
|
|
|
|
test_setup(14350, 20).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn test_setup(budget: u64) -> TestSetup {
|
|
|
|
|
pub(crate) async fn test_setup(budget: u64, max_parallel_jobs: u64) -> TestSetup {
|
|
|
|
|
let catalog = TestCatalog::new();
|
|
|
|
|
let namespace = catalog
|
|
|
|
|
.create_namespace_1hr_retention("namespace_hot_partitions_to_compact")
|
|
|
|
@ -617,7 +625,7 @@ pub mod tests {
|
|
|
|
|
|
|
|
|
|
// Create a compactor
|
|
|
|
|
let time_provider = Arc::new(SystemProvider::new());
|
|
|
|
|
let config = make_compactor_config(budget);
|
|
|
|
|
let config = make_compactor_config(budget, max_parallel_jobs);
|
|
|
|
|
let compactor = Arc::new(Compactor::new(
|
|
|
|
|
vec![shard.shard.id],
|
|
|
|
|
Arc::clone(&catalog.catalog),
|
|
|
|
@ -640,161 +648,11 @@ pub mod tests {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_hot_compact_candidates_with_memory_budget() {
|
|
|
|
|
async fn test_hot_compact_candidates_with_limit_memory_budget() {
|
|
|
|
|
test_helpers::maybe_start_logging();
|
|
|
|
|
|
|
|
|
|
let TestSetup {
|
|
|
|
|
compactor,
|
|
|
|
|
mock_compactor,
|
|
|
|
|
shard,
|
|
|
|
|
table,
|
|
|
|
|
..
|
|
|
|
|
} = test_setup(14350).await;
|
|
|
|
|
|
|
|
|
|
// Some times in the past to set to created_at of the files
|
|
|
|
|
let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1);
|
|
|
|
|
|
|
|
|
|
// P1:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition1 = table.with_shard(&shard).create_partition("one").await;
|
|
|
|
|
|
|
|
|
|
// 2 files with IDs 1 and 2
|
|
|
|
|
let pf1_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition1.create_parquet_file_catalog_record(pf1_1).await;
|
|
|
|
|
|
|
|
|
|
let pf1_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf1_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition1.create_parquet_file_catalog_record(pf1_2).await;
|
|
|
|
|
|
|
|
|
|
// P2:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition2 = table.with_shard(&shard).create_partition("two").await;
|
|
|
|
|
|
|
|
|
|
// 2 files with IDs 3 and 4
|
|
|
|
|
let pf2_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition2.create_parquet_file_catalog_record(pf2_1).await;
|
|
|
|
|
|
|
|
|
|
let pf2_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf2_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition2.create_parquet_file_catalog_record(pf2_2).await;
|
|
|
|
|
|
|
|
|
|
// P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915
|
|
|
|
|
// L0 40 rows. bytes: 2,250
|
|
|
|
|
// Five L1s. bytes: 2,250 each
|
|
|
|
|
// total = 2,250 * 6 = 13,500
|
|
|
|
|
let partition3 = table.with_shard(&shard).create_partition("three").await;
|
|
|
|
|
|
|
|
|
|
// 6 files with IDs 5, 6, 7, 8, 9, 10
|
|
|
|
|
let pf3_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(40)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition3.create_parquet_file_catalog_record(pf3_1).await;
|
|
|
|
|
|
|
|
|
|
// Five overlapped L1 files
|
|
|
|
|
for i in 1..6 {
|
|
|
|
|
let pf3_i = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(i) // overlapped with pf3_1
|
|
|
|
|
.with_max_time(i)
|
|
|
|
|
.with_row_count(24)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition3.create_parquet_file_catalog_record(pf3_i).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// P4: Over the full budget
|
|
|
|
|
// L0 40 rows. bytes: 2,250
|
|
|
|
|
// Six L1s. bytes: 2,250 each
|
|
|
|
|
// total = 2,250 * 7 = 15,750 > 14350
|
|
|
|
|
let partition4 = table.with_shard(&shard).create_partition("four").await;
|
|
|
|
|
|
|
|
|
|
// 7 files with IDs 11, 12, 13, 14, 15, 16, 17
|
|
|
|
|
let pf4_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(7)
|
|
|
|
|
.with_row_count(70)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition4.create_parquet_file_catalog_record(pf4_1).await;
|
|
|
|
|
|
|
|
|
|
// Six overlapped L1 files
|
|
|
|
|
for i in 1..7 {
|
|
|
|
|
let pf4_i = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(i) // overlapped with pf4_1
|
|
|
|
|
.with_max_time(i)
|
|
|
|
|
.with_row_count(40)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition4.create_parquet_file_catalog_record(pf4_i).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// P5:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition5 = table.with_shard(&shard).create_partition("five").await;
|
|
|
|
|
// 2 files with IDs 18, 19
|
|
|
|
|
let pf5_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition5.create_parquet_file_catalog_record(pf5_1).await;
|
|
|
|
|
|
|
|
|
|
let pf5_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf5_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition5.create_parquet_file_catalog_record(pf5_2).await;
|
|
|
|
|
|
|
|
|
|
// P6:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition6 = table.with_shard(&shard).create_partition("six").await;
|
|
|
|
|
// 2 files with IDs 20, 21
|
|
|
|
|
let pf6_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition6.create_parquet_file_catalog_record(pf6_1).await;
|
|
|
|
|
|
|
|
|
|
let pf6_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf6_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition6.create_parquet_file_catalog_record(pf6_2).await;
|
|
|
|
|
// test setup with limit memory budget, 14350, and very large (aka unlimited in this test) max_parallel_jobs, 200
|
|
|
|
|
let (compactor, mock_compactor, partitions) = make_6_partitions(14350, 200).await;
|
|
|
|
|
|
|
|
|
|
// partition candidates: partitions with L0 and overlapped L1
|
|
|
|
|
let mut candidates = hot::hot_partitions_to_compact(Arc::clone(&compactor))
|
|
|
|
@ -838,19 +696,19 @@ pub mod tests {
|
|
|
|
|
assert_eq!(group1.len(), 3);
|
|
|
|
|
|
|
|
|
|
let g1_candidate1 = &group1[0];
|
|
|
|
|
assert_eq!(g1_candidate1.partition.id(), partition1.partition.id);
|
|
|
|
|
assert_eq!(g1_candidate1.partition.id(), partitions[0].partition.id);
|
|
|
|
|
let g1_candidate1_pf_ids: Vec<_> =
|
|
|
|
|
g1_candidate1.files.iter().map(|pf| pf.id().get()).collect();
|
|
|
|
|
assert_eq!(g1_candidate1_pf_ids, vec![2, 1]);
|
|
|
|
|
|
|
|
|
|
let g1_candidate2 = &group1[1];
|
|
|
|
|
assert_eq!(g1_candidate2.partition.id(), partition2.partition.id);
|
|
|
|
|
assert_eq!(g1_candidate2.partition.id(), partitions[1].partition.id);
|
|
|
|
|
let g1_candidate2_pf_ids: Vec<_> =
|
|
|
|
|
g1_candidate2.files.iter().map(|pf| pf.id().get()).collect();
|
|
|
|
|
assert_eq!(g1_candidate2_pf_ids, vec![4, 3]);
|
|
|
|
|
|
|
|
|
|
let g1_candidate3 = &group1[2];
|
|
|
|
|
assert_eq!(g1_candidate3.partition.id(), partition5.partition.id);
|
|
|
|
|
assert_eq!(g1_candidate3.partition.id(), partitions[4].partition.id);
|
|
|
|
|
let g1_candidate3_pf_ids: Vec<_> =
|
|
|
|
|
g1_candidate3.files.iter().map(|pf| pf.id().get()).collect();
|
|
|
|
|
assert_eq!(g1_candidate3_pf_ids, vec![19, 18]);
|
|
|
|
@ -860,7 +718,7 @@ pub mod tests {
|
|
|
|
|
assert_eq!(group2.len(), 1);
|
|
|
|
|
|
|
|
|
|
let g2_candidate1 = &group2[0];
|
|
|
|
|
assert_eq!(g2_candidate1.partition.id(), partition6.partition.id);
|
|
|
|
|
assert_eq!(g2_candidate1.partition.id(), partitions[5].partition.id);
|
|
|
|
|
let g2_candidate1_pf_ids: Vec<_> =
|
|
|
|
|
g2_candidate1.files.iter().map(|pf| pf.id().get()).collect();
|
|
|
|
|
assert_eq!(g2_candidate1_pf_ids, vec![21, 20]);
|
|
|
|
@ -870,21 +728,99 @@ pub mod tests {
|
|
|
|
|
assert_eq!(group3.len(), 1);
|
|
|
|
|
|
|
|
|
|
let g3_candidate1 = &group3[0];
|
|
|
|
|
assert_eq!(g3_candidate1.partition.id(), partition3.partition.id);
|
|
|
|
|
assert_eq!(g3_candidate1.partition.id(), partitions[2].partition.id);
|
|
|
|
|
let g3_candidate1_pf_ids: Vec<_> =
|
|
|
|
|
g3_candidate1.files.iter().map(|pf| pf.id().get()).collect();
|
|
|
|
|
// all IDs of level-1 firts then level-0
|
|
|
|
|
// all IDs of level-1 first then level-0
|
|
|
|
|
assert_eq!(g3_candidate1_pf_ids, vec![6, 7, 8, 9, 10, 5]);
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
let mut repos = compactor.catalog.repositories().await;
|
|
|
|
|
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
|
|
|
|
assert_eq!(skipped_compactions.len(), 1);
|
|
|
|
|
assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
skipped_compactions[0].partition_id,
|
|
|
|
|
partitions[3].partition.id
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(skipped_compactions[0].reason, "over memory budget");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn test_hot_compact_candidates_with_limit_parallel_jobs() {
|
|
|
|
|
test_helpers::maybe_start_logging();
|
|
|
|
|
|
|
|
|
|
// tes setup with plenty of memory budget 1GB (aka unlimited) but limit to 2 parallel jobs
|
|
|
|
|
let (compactor, mock_compactor, partitions) =
|
|
|
|
|
make_6_partitions(1024 * 1024 * 1024, 2).await;
|
|
|
|
|
|
|
|
|
|
// partition candidates: partitions with L0 and overlapped L1
|
|
|
|
|
let mut candidates = hot::hot_partitions_to_compact(Arc::clone(&compactor))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(candidates.len(), 6);
|
|
|
|
|
candidates.sort_by_key(|c| c.candidate.partition_id);
|
|
|
|
|
{
|
|
|
|
|
let mut repos = compactor.catalog.repositories().await;
|
|
|
|
|
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
|
|
|
|
|
assert!(
|
|
|
|
|
skipped_compactions.is_empty(),
|
|
|
|
|
"Expected no skipped compactions, got: {skipped_compactions:?}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// There are 3 rounds of parallel compaction:
|
|
|
|
|
//
|
|
|
|
|
// * Round 1: 2 candidates [P1, P2]
|
|
|
|
|
// * Round 2: 2 candidate [P3, P4]
|
|
|
|
|
// * Round 3: 1 candidate [P5, P6]
|
|
|
|
|
|
|
|
|
|
compact_candidates_with_memory_budget(
|
|
|
|
|
Arc::clone(&compactor),
|
|
|
|
|
"hot",
|
|
|
|
|
CompactionLevel::Initial,
|
|
|
|
|
mock_compactor.compaction_function(),
|
|
|
|
|
true,
|
|
|
|
|
candidates.into(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
let compaction_groups = mock_compactor.results();
|
|
|
|
|
|
|
|
|
|
// 3 rounds of parallel compaction
|
|
|
|
|
assert_eq!(compaction_groups.len(), 3);
|
|
|
|
|
|
|
|
|
|
// Round 1
|
|
|
|
|
let group1 = &compaction_groups[0];
|
|
|
|
|
assert_eq!(group1.len(), 2);
|
|
|
|
|
|
|
|
|
|
let g1_candidate1 = &group1[0];
|
|
|
|
|
assert_eq!(g1_candidate1.partition.id(), partitions[0].partition.id);
|
|
|
|
|
|
|
|
|
|
let g1_candidate2 = &group1[1];
|
|
|
|
|
assert_eq!(g1_candidate2.partition.id(), partitions[1].partition.id);
|
|
|
|
|
|
|
|
|
|
// Round 2
|
|
|
|
|
let group2 = &compaction_groups[1];
|
|
|
|
|
assert_eq!(group2.len(), 2);
|
|
|
|
|
|
|
|
|
|
let g2_candidate1 = &group2[0];
|
|
|
|
|
assert_eq!(g2_candidate1.partition.id(), partitions[2].partition.id);
|
|
|
|
|
|
|
|
|
|
let g2_candidate2 = &group2[1];
|
|
|
|
|
assert_eq!(g2_candidate2.partition.id(), partitions[3].partition.id);
|
|
|
|
|
|
|
|
|
|
// Round 3
|
|
|
|
|
let group3 = &compaction_groups[2];
|
|
|
|
|
assert_eq!(group3.len(), 2);
|
|
|
|
|
|
|
|
|
|
let g3_candidate1 = &group3[0];
|
|
|
|
|
assert_eq!(g3_candidate1.partition.id(), partitions[4].partition.id);
|
|
|
|
|
|
|
|
|
|
let g3_candidate2 = &group3[1];
|
|
|
|
|
assert_eq!(g3_candidate2.partition.id(), partitions[5].partition.id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// A quite sophisticated integration test of compacting one hot partition
|
|
|
|
|
// Beside lp data, every value min/max sequence numbers and min/max time are important
|
|
|
|
|
// to have a combination of needed tests in this test function
|
|
|
|
@ -962,6 +898,7 @@ pub mod tests {
|
|
|
|
|
minutes_without_new_writes_to_be_cold: 10,
|
|
|
|
|
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
|
|
|
|
|
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
|
|
|
|
|
max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let metrics = Arc::new(metric::Registry::new());
|
|
|
|
@ -1153,4 +1090,172 @@ pub mod tests {
|
|
|
|
|
&batches
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn make_6_partitions(
|
|
|
|
|
budget: u64,
|
|
|
|
|
max_parallel_jobs: u64,
|
|
|
|
|
) -> (Arc<Compactor>, MockCompactor, Vec<Arc<TestPartition>>) {
|
|
|
|
|
let TestSetup {
|
|
|
|
|
compactor,
|
|
|
|
|
mock_compactor,
|
|
|
|
|
shard,
|
|
|
|
|
table,
|
|
|
|
|
..
|
|
|
|
|
} = test_setup(budget, max_parallel_jobs).await;
|
|
|
|
|
|
|
|
|
|
// Some times in the past to set to created_at of the files
|
|
|
|
|
let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1);
|
|
|
|
|
|
|
|
|
|
let mut partitions = Vec::with_capacity(6);
|
|
|
|
|
|
|
|
|
|
// P1:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition1 = table.with_shard(&shard).create_partition("one").await;
|
|
|
|
|
|
|
|
|
|
// 2 files with IDs 1 and 2
|
|
|
|
|
let pf1_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition1.create_parquet_file_catalog_record(pf1_1).await;
|
|
|
|
|
|
|
|
|
|
let pf1_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf1_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition1.create_parquet_file_catalog_record(pf1_2).await;
|
|
|
|
|
partitions.push(partition1);
|
|
|
|
|
|
|
|
|
|
// P2:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition2 = table.with_shard(&shard).create_partition("two").await;
|
|
|
|
|
|
|
|
|
|
// 2 files with IDs 3 and 4
|
|
|
|
|
let pf2_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition2.create_parquet_file_catalog_record(pf2_1).await;
|
|
|
|
|
|
|
|
|
|
let pf2_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf2_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition2.create_parquet_file_catalog_record(pf2_2).await;
|
|
|
|
|
partitions.push(partition2);
|
|
|
|
|
|
|
|
|
|
// P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915
|
|
|
|
|
// L0 40 rows. bytes: 2,250
|
|
|
|
|
// Five L1s. bytes: 2,250 each
|
|
|
|
|
// total = 2,250 * 6 = 13,500
|
|
|
|
|
let partition3 = table.with_shard(&shard).create_partition("three").await;
|
|
|
|
|
|
|
|
|
|
// 6 files with IDs 5, 6, 7, 8, 9, 10
|
|
|
|
|
let pf3_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(40)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition3.create_parquet_file_catalog_record(pf3_1).await;
|
|
|
|
|
|
|
|
|
|
// Five overlapped L1 files
|
|
|
|
|
for i in 1..6 {
|
|
|
|
|
let pf3_i = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(i) // overlapped with pf3_1
|
|
|
|
|
.with_max_time(i)
|
|
|
|
|
.with_row_count(24)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition3.create_parquet_file_catalog_record(pf3_i).await;
|
|
|
|
|
}
|
|
|
|
|
partitions.push(partition3);
|
|
|
|
|
|
|
|
|
|
// P4: Over the full budget
|
|
|
|
|
// L0 40 rows. bytes: 2,250
|
|
|
|
|
// Six L1s. bytes: 2,250 each
|
|
|
|
|
// total = 2,250 * 7 = 15,750 > 14350
|
|
|
|
|
let partition4 = table.with_shard(&shard).create_partition("four").await;
|
|
|
|
|
|
|
|
|
|
// 7 files with IDs 11, 12, 13, 14, 15, 16, 17
|
|
|
|
|
let pf4_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(7)
|
|
|
|
|
.with_row_count(70)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition4.create_parquet_file_catalog_record(pf4_1).await;
|
|
|
|
|
|
|
|
|
|
// Six overlapped L1 files
|
|
|
|
|
for i in 1..7 {
|
|
|
|
|
let pf4_i = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(i) // overlapped with pf4_1
|
|
|
|
|
.with_max_time(i)
|
|
|
|
|
.with_row_count(40)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition4.create_parquet_file_catalog_record(pf4_i).await;
|
|
|
|
|
}
|
|
|
|
|
partitions.push(partition4);
|
|
|
|
|
|
|
|
|
|
// P5:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition5 = table.with_shard(&shard).create_partition("five").await;
|
|
|
|
|
// 2 files with IDs 18, 19
|
|
|
|
|
let pf5_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition5.create_parquet_file_catalog_record(pf5_1).await;
|
|
|
|
|
|
|
|
|
|
let pf5_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf5_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition5.create_parquet_file_catalog_record(pf5_2).await;
|
|
|
|
|
partitions.push(partition5);
|
|
|
|
|
|
|
|
|
|
// P6:
|
|
|
|
|
// L0 2 rows. bytes: 2,250
|
|
|
|
|
// L1 2 rows. bytes: 2,250
|
|
|
|
|
// total = 2,250 + 2,250 = 4,500
|
|
|
|
|
let partition6 = table.with_shard(&shard).create_partition("six").await;
|
|
|
|
|
// 2 files with IDs 20, 21
|
|
|
|
|
let pf6_1 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(1)
|
|
|
|
|
.with_max_time(5)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::Initial)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition6.create_parquet_file_catalog_record(pf6_1).await;
|
|
|
|
|
|
|
|
|
|
let pf6_2 = TestParquetFileBuilder::default()
|
|
|
|
|
.with_min_time(4) // overlapped with pf6_1
|
|
|
|
|
.with_max_time(6)
|
|
|
|
|
.with_row_count(2)
|
|
|
|
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
|
|
|
|
.with_creation_time(hot_time_one_hour_ago);
|
|
|
|
|
partition6.create_parquet_file_catalog_record(pf6_2).await;
|
|
|
|
|
partitions.push(partition6);
|
|
|
|
|
|
|
|
|
|
(compactor, mock_compactor, partitions)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|