feat: add file size into estimated memory (#5837)

* feat: add file size into estimataed memory

* chore: cleanup

* chore: fmt

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

* chore: run fmt after applying review suggestion

* fix: fix tests towork with the change for review suggestion

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-10-12 10:42:53 -04:00 committed by GitHub
parent c9672bdc65
commit f05ca867a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 44 deletions

View File

@ -937,7 +937,7 @@ mod tests {
let mut config = make_compactor_config();
// Set the memory budget such that only some of the files will be compacted in a group
config.memory_budget_bytes = 1050;
config.memory_budget_bytes = 18000;
let metrics = Arc::new(metric::Registry::new());
let compactor = Arc::new(Compactor::new(
@ -951,9 +951,6 @@ mod tests {
Arc::clone(&metrics),
));
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1, L1, overlaps with lp5 (L2)
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
@ -962,11 +959,9 @@ mod tests {
.with_max_time(20)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
// p1 file size: 1757
let pf1 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf1.parquet_file.id,
compactor.config.max_desired_file_size_bytes as i64 + 10,
);
println!("=== p1 file size: {:#?}", pf1.parquet_file.file_size_bytes);
// pf2, L2, overlaps with lp3 (L1)
let builder = TestParquetFileBuilder::default()
@ -976,11 +971,9 @@ mod tests {
.with_max_time(20_000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::Final);
// p2 file size: 1777
let pf2 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf2.parquet_file.id,
100, // small file
);
println!("=== p2 file size: {:#?}", pf2.parquet_file.file_size_bytes);
// pf3, L1, overlaps with lp2 (L2)
let builder = TestParquetFileBuilder::default()
@ -990,11 +983,9 @@ mod tests {
.with_max_time(25_000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
// p3 file size: 1777
let pf3 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf3.parquet_file.id,
100, // small file
);
println!("=== p3 file size: {:#?}", pf3.parquet_file.file_size_bytes);
// pf4, L1, does not overlap with any, won't fit in budget with 1, 2, 3, 5
let builder = TestParquetFileBuilder::default()
@ -1004,11 +995,9 @@ mod tests {
.with_max_time(28_000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
// p4 file size: 2183
let pf4 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf4.parquet_file.id,
100, // small file
);
println!("=== p4 file size: {:#?}", pf4.parquet_file.file_size_bytes);
// pf5, L2, overlaps with lp1 (L1)
let builder = TestParquetFileBuilder::default()
@ -1018,11 +1007,9 @@ mod tests {
.with_max_time(25)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::Final);
// p5 file size: 2183
let pf5 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf5.parquet_file.id,
100, // small file
);
println!("=== p5 file size: {:#?}", pf5.parquet_file.file_size_bytes);
// pf6, L2, does not overlap with any
let builder = TestParquetFileBuilder::default()
@ -1032,11 +1019,9 @@ mod tests {
.with_max_time(91000)
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::Final);
// p6 file size: 2183
let pf6 = partition.create_parquet_file(builder).await;
size_overrides.insert(
pf6.parquet_file.id,
100, // small file
);
println!("=== p6 file size: {:#?}", pf6.parquet_file.file_size_bytes);
// ------------------------------------------------
// Compact

View File

@ -88,17 +88,22 @@ impl ParquetFilesForCompaction {
let mut level_2 = Vec::with_capacity(parquet_files.len());
for parquet_file in parquet_files {
// Estimate the bytes DataFusion needs when scan this file
let estimated_arrow_bytes = partition.estimated_arrow_bytes(
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
parquet_file.row_count,
);
// Add bytes need to read this file into memory
// as well as the bytes needed to buffer the output
let total_estimated_bytes =
estimated_arrow_bytes + (2 * parquet_file.file_size_bytes as u64);
let parquet_file = match size_overrides.get(&parquet_file.id) {
Some(size) => CompactorParquetFile::new_with_size_override(
parquet_file,
estimated_arrow_bytes,
total_estimated_bytes,
*size,
),
None => CompactorParquetFile::new(parquet_file, estimated_arrow_bytes),
None => CompactorParquetFile::new(parquet_file, total_estimated_bytes),
};
match parquet_file.compaction_level() {
CompactionLevel::Initial => level_0.push(parquet_file),
@ -259,9 +264,13 @@ mod tests {
.await
.unwrap();
let parquet_file_file_size_in_mem = 2 * parquet_file.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_0,
vec![CompactorParquetFile::new(parquet_file.parquet_file, 0)]
vec![CompactorParquetFile::new(
parquet_file.parquet_file,
parquet_file_file_size_in_mem
)]
);
assert!(
@ -304,9 +313,13 @@ mod tests {
parquet_files_for_compaction.level_0
);
let parquet_file_file_size_in_mem = 2 * parquet_file.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_1,
vec![CompactorParquetFile::new(parquet_file.parquet_file, 0)]
vec![CompactorParquetFile::new(
parquet_file.parquet_file,
parquet_file_file_size_in_mem
)]
);
}
@ -349,9 +362,13 @@ mod tests {
parquet_files_for_compaction.level_1
);
let parquet_file_file_size_in_mem = 2 * parquet_file.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_2,
vec![CompactorParquetFile::new(parquet_file.parquet_file, 0)]
vec![CompactorParquetFile::new(
parquet_file.parquet_file,
parquet_file_file_size_in_mem
)]
);
}
@ -394,19 +411,31 @@ mod tests {
.await
.unwrap();
let l0_file_size_in_mem = 2 * l0.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_0,
vec![CompactorParquetFile::new(l0.parquet_file, 0)]
vec![CompactorParquetFile::new(
l0.parquet_file,
l0_file_size_in_mem
)]
);
let l1_file_size_in_mem = 2 * l1.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_1,
vec![CompactorParquetFile::new(l1.parquet_file, 0)]
vec![CompactorParquetFile::new(
l1.parquet_file,
l1_file_size_in_mem
)]
);
let l2_file_size_in_mem = 2 * l2.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_2,
vec![CompactorParquetFile::new(l2.parquet_file, 0)]
vec![CompactorParquetFile::new(
l2.parquet_file,
l2_file_size_in_mem
)]
);
}
@ -451,17 +480,31 @@ mod tests {
.await
.unwrap();
let l0_max_seq_50_file_size_in_mem = 2 * l0_max_seq_50.parquet_file.file_size_bytes as u64;
let l0_max_seq_100_file_size_in_mem =
2 * l0_max_seq_100.parquet_file.file_size_bytes as u64;
let l1_file_size_in_mem = 2 * l1.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_0,
vec![
CompactorParquetFile::new(l0_max_seq_50.parquet_file, 0),
CompactorParquetFile::new(l0_max_seq_100.parquet_file, 0),
CompactorParquetFile::new(
l0_max_seq_50.parquet_file,
l0_max_seq_50_file_size_in_mem
),
CompactorParquetFile::new(
l0_max_seq_100.parquet_file,
l0_max_seq_100_file_size_in_mem
),
]
);
assert_eq!(
parquet_files_for_compaction.level_1,
vec![CompactorParquetFile::new(l1.parquet_file, 0)]
vec![CompactorParquetFile::new(
l1.parquet_file,
l1_file_size_in_mem
)]
);
}
@ -522,23 +565,46 @@ mod tests {
.await
.unwrap();
let l0_file_size_in_mem = 2 * l0.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_0,
vec![CompactorParquetFile::new(l0.parquet_file, 0)]
vec![CompactorParquetFile::new(
l0.parquet_file,
l0_file_size_in_mem
)]
);
let l1_min_time_6666_file_size_in_mem =
2 * l1_min_time_6666.parquet_file.file_size_bytes as u64;
let l1_min_time_7777_file_size_in_mem =
2 * l1_min_time_7777.parquet_file.file_size_bytes as u64;
let l1_min_time_8888_file_size_in_mem =
2 * l1_min_time_8888.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_1,
vec![
CompactorParquetFile::new(l1_min_time_6666.parquet_file, 0),
CompactorParquetFile::new(l1_min_time_7777.parquet_file, 0),
CompactorParquetFile::new(l1_min_time_8888.parquet_file, 0),
CompactorParquetFile::new(
l1_min_time_6666.parquet_file,
l1_min_time_6666_file_size_in_mem
),
CompactorParquetFile::new(
l1_min_time_7777.parquet_file,
l1_min_time_7777_file_size_in_mem
),
CompactorParquetFile::new(
l1_min_time_8888.parquet_file,
l1_min_time_8888_file_size_in_mem
),
]
);
let l2_file_size_in_mem = 2 * l2.parquet_file.file_size_bytes as u64;
assert_eq!(
parquet_files_for_compaction.level_2,
vec![CompactorParquetFile::new(l2.parquet_file, 0)]
vec![CompactorParquetFile::new(
l2.parquet_file,
l2_file_size_in_mem
)]
);
}
}