feat: not split output files in the first step of cold compaction (#5781)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-09-30 12:08:03 -04:00 committed by GitHub
parent d171697fd7
commit 2f08a64f16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 284 additions and 161 deletions

View File

@ -45,7 +45,7 @@ pub async fn compact(compactor: Arc<Compactor>, do_full_compact: bool) -> usize
compaction_type,
CompactionLevel::Initial,
compact_in_parallel,
true, // split
false, // no split
candidates.clone().into(),
)
.await;
@ -108,7 +108,7 @@ mod tests {
use backoff::BackoffConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFileId};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use std::collections::HashMap;
@ -700,166 +700,112 @@ mod tests {
}
}
#[tokio::test]
async fn cold_compaction_first_step_no_split() {
test_helpers::maybe_start_logging();
let TestDb {
catalog,
compactor,
table,
} = make_db_with_4_l0s_2_l1s().await;
// Let do cold compaction first step
//
// Select partition candidates. Must be one becasue all 6 files belong to the same partition
let candidates = compactor.cold_partitions_to_compact(10).await.unwrap();
assert_eq!(candidates.len(), 1);
//
// Cold compaction first step
compact_candidates_with_memory_budget(
Arc::clone(&compactor),
"cold",
CompactionLevel::Initial,
compact_in_parallel,
false, // no split
candidates.clone().into(),
)
.await;
// Should have 2 non-soft-deleted L-1 files:
// . level-1 pf6 with is not compacted because it does not overlap with any L0s
// . new compacted level-1 file as a result of compacting 4 L0s (pf1, pf2, pf3, pf4) and one L1 (pf5)
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 2, "{files:?}");
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
// The initial files and their IDs are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5
// and compacted them into level-1 with next ID 7
assert_eq!(
files_and_levels,
vec![
(6, CompactionLevel::FileNonOverlapped),
(7, CompactionLevel::FileNonOverlapped)
]
);
}
#[tokio::test]
async fn cold_compaction_first_step_split() {
test_helpers::maybe_start_logging();
let TestDb {
catalog,
compactor,
table,
} = make_db_with_4_l0s_2_l1s().await;
// Let do cold compaction first step
//
// Select partition candidates. Must be one becasue all 6 files belong to the same partition
let candidates = compactor.cold_partitions_to_compact(10).await.unwrap();
assert_eq!(candidates.len(), 1);
//
// Cold compaction first step
compact_candidates_with_memory_budget(
Arc::clone(&compactor),
"cold",
CompactionLevel::Initial,
compact_in_parallel,
true, // split
candidates.clone().into(),
)
.await;
// Should have 3 non-soft-deleted L-1 files:
// . level-1 pf6 is not compacted because it does not overlap with any L0s
// . Two new compacted level-1 files as a result of compacting 4 L0s (pf1, pf2, pf3, pf4) and one L1 (pf5)
// and split the output into 2 files
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 3, "{files:?}");
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
// The initial files and their IDs are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5
// and compacted them into 2 level-1 with IDs 7 and 8
assert_eq!(
files_and_levels,
vec![
(6, CompactionLevel::FileNonOverlapped),
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped)
]
);
}
#[tokio::test]
async fn full_cold_compaction_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = time.hours_ago(38);
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Arc::new(Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
));
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_creation_time(time_38_hour_ago);
let pf1 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf1.parquet_file.id,
compactor.config.max_desired_file_size_bytes as i64 + 10,
);
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_creation_time(time_38_hour_ago);
let pf2 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf2.parquet_file.id,
100, // small file
);
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_creation_time(time_38_hour_ago);
let pf3 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf3.parquet_file.id,
100, // small file
);
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_creation_time(time_38_hour_ago);
let pf4 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf4.parquet_file.id,
100, // small file
);
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf5 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf5.parquet_file.id,
100, // small file
);
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf6 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf6.parquet_file.id,
100, // small file
);
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
let TestDb {
catalog,
compactor,
table,
} = make_db_with_4_l0s_2_l1s().await;
// ------------------------------------------------
// Compact
@ -878,9 +824,9 @@ mod tests {
.collect();
// The initial files are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5
// and compacted them and split them into files 7 and 8. The second step of cold compaction
// took 6, 7, and 8 and combined them all into file 9.
assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final)]);
// and compacted them into a l-1 file 7. The second step of cold compaction
// took 6 and 7 and combined them all into file 8.
assert_eq!(files_and_levels, vec![(8, CompactionLevel::Final)]);
// ------------------------------------------------
// Verify the parquet file content
@ -913,6 +859,7 @@ mod tests {
#[tokio::test]
async fn full_cold_compaction_new_level_1_overlapping_with_level_2() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 will be level 1 with min time 10, overlaps with lp5 (L2)
@ -1142,4 +1089,180 @@ mod tests {
&batches
);
}
struct TestDb {
catalog: Arc<TestCatalog>,
compactor: Arc<Compactor>,
table: Arc<TestTable>,
}
async fn make_db_with_4_l0s_2_l1s() -> TestDb {
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = time.hours_ago(38);
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Arc::new(Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
));
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_creation_time(time_38_hour_ago);
let pf1 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf1.parquet_file.id,
compactor.config.max_desired_file_size_bytes as i64 + 10,
);
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_creation_time(time_38_hour_ago);
let pf2 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf2.parquet_file.id,
100, // small file
);
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_creation_time(time_38_hour_ago);
let pf3 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf3.parquet_file.id,
100, // small file
);
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_creation_time(time_38_hour_ago);
let pf4 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf4.parquet_file.id,
100, // small file
);
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf5 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf5.parquet_file.id,
100, // small file
);
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let pf6 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf6.parquet_file.id,
100, // small file
);
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// should have two level-1 file. Thus total L-0 and l-1 will be six before compacting
let all_files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(all_files.len(), 6);
TestDb {
catalog: Arc::clone(&catalog),
compactor: Arc::clone(&compactor),
table: Arc::clone(&table),
}
}
}