test: Verify L1 files chosen for compaction are limited by the memory budget
parent
3cdf2556ec
commit
cdd01eb3fc
|
@ -967,11 +967,125 @@ mod tests {
|
|||
|
||||
// Both files get upgraded from level 1 to level 2 without actually doing compaction,
|
||||
// so their IDs stay the same.
|
||||
assert_eq!(
|
||||
files_and_levels,
|
||||
vec![(1, CompactionLevel::Final), (2, CompactionLevel::Final),]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn level_1_files_limited_by_estimated_size() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
// lp1 will be level 1 with min time 26000, no overlaps
|
||||
let lp1 = vec![
|
||||
"table,tag2=WA,tag3=10 field_int=1600i 28000",
|
||||
"table,tag2=VT,tag3=20 field_int=20i 26000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp2 will be level 1 with min time 90000, no overlaps
|
||||
let lp2 = vec![
|
||||
"table,tag2=PA,tag3=15 field_int=81601i 90000",
|
||||
"table,tag2=OH,tag3=21 field_int=421i 91000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp3 will be level 1 with min time 350000, no overlaps
|
||||
let lp3 = vec![
|
||||
"table,tag2=ND,tag3=8 field_int=333i 350000",
|
||||
"table,tag2=SD,tag3=207 field_int=999i 360000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let table = ns.create_table("table").await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
table.create_column("tag1", ColumnType::Tag).await;
|
||||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
|
||||
let partition = table.with_shard(&shard).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let time_38_hour_ago = time.hours_ago(38);
|
||||
let mut config = make_compactor_config();
|
||||
|
||||
// Set the memory budget such that two of the files will be compacted in a group
|
||||
config.memory_budget_bytes = 9_200;
|
||||
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let compactor = Arc::new(Compactor::new(
|
||||
vec![shard.shard.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
config,
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
// pf1, L1
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp1)
|
||||
.with_max_seq(1)
|
||||
.with_min_time(26_000)
|
||||
.with_max_time(28_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
// pf1 file size: 2183, estimated file bytes: 4590
|
||||
let pf1 = partition.create_parquet_file(builder).await;
|
||||
println!("=== pf1 file size: {:#?}", pf1.parquet_file.file_size_bytes);
|
||||
|
||||
// pf2, L1
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp2)
|
||||
.with_max_seq(20)
|
||||
.with_min_time(90_000)
|
||||
.with_max_time(91_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
// pf2 file size: 2183, estimated file bytes: 4590
|
||||
let pf2 = partition.create_parquet_file(builder).await;
|
||||
println!("=== pf2 file size: {:#?}", pf2.parquet_file.file_size_bytes);
|
||||
|
||||
// pf3, L1
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp3)
|
||||
.with_max_seq(25)
|
||||
.with_min_time(350_000)
|
||||
.with_max_time(360_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
// pf2 file size: 2183, estimated file bytes: 4590
|
||||
let pf3 = partition.create_parquet_file(builder).await;
|
||||
println!("=== pf3 file size: {:#?}", pf3.parquet_file.file_size_bytes);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
|
||||
compact(compactor, true).await;
|
||||
|
||||
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 3, "{files:?}");
|
||||
let files_and_levels: Vec<_> = files
|
||||
.iter()
|
||||
.map(|f| (f.id.get(), f.compaction_level))
|
||||
.collect();
|
||||
|
||||
// Compaction will select files 1 and 2 because adding file 3 would go over the memory
|
||||
// budget. File 3 remains untouched. Files 1 and 2 are compacted and then split into files
|
||||
// 4 and 5 at level 2.
|
||||
assert_eq!(
|
||||
files_and_levels,
|
||||
vec![
|
||||
(1, CompactionLevel::Final),
|
||||
(2, CompactionLevel::Final),
|
||||
(3, CompactionLevel::FileNonOverlapped),
|
||||
(4, CompactionLevel::Final),
|
||||
(5, CompactionLevel::Final),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue