From bb7df22aa1783e040ea165153876f1fe36838d4e Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 23 Sep 2022 11:51:25 -0400 Subject: [PATCH] chore: always use a fixed number of rows (8192) per batch to estimate memory (#5733) --- compactor/src/compact.rs | 35 +++++++++++++------------ compactor/src/lib.rs | 56 +++++++++++++++++++++------------------- 2 files changed, 47 insertions(+), 44 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 7b42ccb728..b389b05e8f 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -18,7 +18,6 @@ use parquet_file::storage::ParquetStorage; use schema::sort::SortKey; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ - cmp::max, collections::{HashMap, HashSet}, sync::Arc, time::Duration, @@ -516,11 +515,9 @@ fn estimate_arrow_bytes_for_file( let average_cardinality = row_count / AVERAGE_ROW_COUNT_CARDINALITY_RATIO; - // Since there is a minimum number of rows per batch, we will use that minimum number of rows if the row_count is smaller - let row_count = max( - row_count, - min_num_rows_allocated_per_record_batch_to_datafusion_plan as i64, - ); + // Since DataFusion streams files and allocates a fixed (configurable) number of rows per batch, + // we always use that number to estimate the memory usage per batch. + let row_count_per_batch = min_num_rows_allocated_per_record_batch_to_datafusion_plan as i64; // Bytes needed for number columns let mut value_bytes = 0; @@ -531,17 +528,17 @@ fn estimate_arrow_bytes_for_file( for c in columns { match c.col_type { ColumnType::I64 | ColumnType::U64 | ColumnType::F64 | ColumnType::Time => { - value_bytes += c.count * row_count * VALUE_BYTE; + value_bytes += c.count * row_count_per_batch * VALUE_BYTE; } ColumnType::String => { - string_bytes += row_count * STRING_LENGTH; + string_bytes += row_count_per_batch * STRING_LENGTH; } ColumnType::Bool => { - bool_bytes += row_count * BOOL_BYTE; + bool_bytes += row_count_per_batch * BOOL_BYTE; } ColumnType::Tag => { dictionary_key_bytes += average_cardinality * AVERAGE_TAG_VALUE_LENGTH; - dictionary_value_bytes = row_count * DICTIONARY_BYTE; + dictionary_value_bytes = row_count_per_batch * DICTIONARY_BYTE; } } } @@ -585,7 +582,9 @@ pub mod tests { #[test] fn test_estimate_arrow_bytes_for_file_small_row_count() { let row_count = 11; - let min_num_rows_allocated_per_record_batch = 20; // > row_count and will be used + // Always use this config param to estimate memory usage for each batch + // no matter it is larger or smaller than row_count + let min_num_rows_allocated_per_record_batch = 20; // Time, U64, I64, F64 let columns = vec![ @@ -649,7 +648,9 @@ pub mod tests { #[test] fn test_estimate_arrow_bytes_for_file_large_row_count() { let row_count = 11; - let min_num_rows_allocated_per_record_batch = 10; // < row_count and won't be used + // Always use this config param to estimate memory usage for each batch + // even if it is smaller than row_count + let min_num_rows_allocated_per_record_batch = 10; // Time, U64, I64, F64 let columns = vec![ @@ -663,7 +664,7 @@ pub mod tests { min_num_rows_allocated_per_record_batch, row_count, ); - assert_eq!(bytes, 880); // 11 * (1+2+3+4) * 8 + assert_eq!(bytes, 800); // 10 * (1+2+3+4) * 8 // Tag let columns = vec![ColumnTypeCount::new(ColumnType::Tag, 1)]; @@ -672,7 +673,7 @@ pub mod tests { min_num_rows_allocated_per_record_batch, row_count, ); - assert_eq!(bytes, 1088); // 5 * 200 + 11 * 8 + assert_eq!(bytes, 1080); // 5 * 200 + 10 * 8 // String let columns = vec![ColumnTypeCount::new(ColumnType::String, 1)]; @@ -681,7 +682,7 @@ pub mod tests { min_num_rows_allocated_per_record_batch, row_count, ); - assert_eq!(bytes, 11000); // 11 * 1000 + assert_eq!(bytes, 10000); // 10 * 1000 // Bool let columns = vec![ColumnTypeCount::new(ColumnType::Bool, 1)]; @@ -690,7 +691,7 @@ pub mod tests { min_num_rows_allocated_per_record_batch, row_count, ); - assert_eq!(bytes, 11); // 11 * 1 + assert_eq!(bytes, 10); // 10 * 1 // all types let columns = vec![ @@ -707,7 +708,7 @@ pub mod tests { min_num_rows_allocated_per_record_batch, row_count, ); - assert_eq!(bytes, 12979); // 880 + 1088 + 11000 + 11 + assert_eq!(bytes, 11890); // 800 + 1080 + 10000 + 10 } fn make_compactor_config() -> CompactorConfig { diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 9080275f6e..38823b60fd 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -524,6 +524,8 @@ pub mod tests { } fn make_compactor_config() -> CompactorConfig { + // All numbers in here are chosen carefully for many tests. + // Change them will break the tests CompactorConfig { max_desired_file_size_bytes: 100_000_000, percentage_max_file_size: 90, @@ -531,8 +533,8 @@ pub mod tests { max_number_partitions_per_shard: 100, min_number_recent_ingested_files_per_partition: 1, hot_multiple: 4, - memory_budget_bytes: 12 * 1125, // 13,500 bytes - min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1, + memory_budget_bytes: 14 * 1025, // 14,350 + min_num_rows_allocated_per_record_batch_to_datafusion_plan: 2, max_num_compacting_files: 20, } } @@ -552,8 +554,8 @@ pub mod tests { let shard = namespace.create_shard(1).await; // Create a scenario of a table of 5 columns: tag, time, field int, field string, field - // bool. Thus, each file will have estimated memory bytes = 1125 * row_count (for even - // row_counts). + // bool. Thus, given min_num_rows_allocated_per_record_batch_to_datafusion_plan = 2, + // each file will have estimated memory bytes = 2050 + 100 * row_count (for even row_counts) let table = namespace.create_table("test_table").await; table.create_column("tag", ColumnType::Tag).await; @@ -605,8 +607,8 @@ pub mod tests { let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1); // P1: - // L0 2 rows. bytes: 1125 * 2 = 2,250 - // L1 2 rows. bytes: 1125 * 2 = 2,250 + // L0 2 rows. bytes: 2050 + 100*2 = 2,250 + // L1 2 rows. bytes: 2050 + 100*2 = 2,250 // total = 2,250 + 2,250 = 4,500 let partition1 = table.with_shard(&shard).create_partition("one").await; @@ -627,8 +629,8 @@ pub mod tests { partition1.create_parquet_file_catalog_record(pf1_2).await; // P2: - // L0 2 rows. bytes: 1125 * 2 = 2,250 - // L1 2 rows. bytes: 1125 * 2 = 2,250 + // L0 2 rows. bytes: 2050 + 100*2 = 2,250 + // L1 2 rows. bytes: 2050 + 100*2 = 2,250 // total = 2,250 + 2,250 = 4,500 let partition2 = table.with_shard(&shard).create_partition("two").await; @@ -648,15 +650,15 @@ pub mod tests { .with_creation_time(hot_time_one_hour_ago); partition2.create_parquet_file_catalog_record(pf2_2).await; - // P3: bytes >= 90% of full budget = 90% * 13,500 = 12,150 - // L0 6 rows. bytes: 1125 * 6 = 6,750 - // L1 4 rows. bytes: 1125 * 4 = 4,500 - // total = 6,700 + 4,500 = 12,150 + // P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915 + // L0 40 rows. bytes: 2050 + 100 * 50 = 7,050 + // L1 24 rows. bytes: 2050 + 100 * 40 = 6,050 + // total = 7,050 + 6,050 = 13,100 let partition3 = table.with_shard(&shard).create_partition("three").await; let pf3_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) - .with_row_count(6) + .with_row_count(40) .with_compaction_level(CompactionLevel::Initial) .with_creation_time(hot_time_one_hour_ago); partition3.create_parquet_file_catalog_record(pf3_1).await; @@ -664,20 +666,20 @@ pub mod tests { let pf3_2 = TestParquetFileBuilder::default() .with_min_time(4) // overlapped with pf3_1 .with_max_time(6) - .with_row_count(4) + .with_row_count(24) .with_compaction_level(CompactionLevel::FileNonOverlapped) .with_creation_time(hot_time_one_hour_ago); partition3.create_parquet_file_catalog_record(pf3_2).await; // P4: Over the full budget - // L0 with 8 rows.bytes = 1125 * 8 = 9,000 - // L1 with 6 rows.bytes = 1125 * 6 = 6,750 - // total = 15,750 + // L0 with 70 rows.bytes = 2050 + 100 * 70 = 9,050 + // L1 with 40 rows.bytes = 2050 + 100 * 40 = 6,050 + // total = 16,000 let partition4 = table.with_shard(&shard).create_partition("four").await; let pf4_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) - .with_row_count(8) + .with_row_count(70) .with_compaction_level(CompactionLevel::Initial) .with_creation_time(hot_time_one_hour_ago); partition4.create_parquet_file_catalog_record(pf4_1).await; @@ -685,15 +687,15 @@ pub mod tests { let pf4_2 = TestParquetFileBuilder::default() .with_min_time(4) // overlapped with pf4_1 .with_max_time(6) - .with_row_count(6) + .with_row_count(40) .with_compaction_level(CompactionLevel::FileNonOverlapped) .with_creation_time(hot_time_one_hour_ago); partition4.create_parquet_file_catalog_record(pf4_2).await; // P5: - // L0 with 2 rows.bytes = 1125 * 2 = 2,250 - // L1 with 2 rows.bytes = 1125 * 2 = 2,250 - // total = 4,500 + // L0 2 rows. bytes: 2050 + 100*2 = 2,250 + // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // total = 2,250 + 2,250 = 4,500 let partition5 = table.with_shard(&shard).create_partition("five").await; let pf5_1 = TestParquetFileBuilder::default() .with_min_time(1) @@ -712,9 +714,9 @@ pub mod tests { partition5.create_parquet_file_catalog_record(pf5_2).await; // P6: - // L0 with 2 rows.bytes = 1125 * 2 = 2,250 - // L1 with 2 rows.bytes = 1125 * 2 = 2,250 - // total = 4,500 + // L0 2 rows. bytes: 2050 + 100*2 = 2,250 + // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // total = 2,250 + 2,250 = 4,500 let partition6 = table.with_shard(&shard).create_partition("six").await; let pf6_1 = TestParquetFileBuilder::default() .with_min_time(1) @@ -751,7 +753,7 @@ pub mod tests { // // * Round 1: 3 candidates [P1, P2, P5] and total needed budget 13,500 // * Round 2: 1 candidate [P6] and total needed budget 4,500 - // * Round 3: 1 candidate [P3] and total needed budget 11,250 + // * Round 3: 1 candidate [P3] and total needed budget 13,100 // // P4 is not compacted due to overbudget. compact_candidates_with_memory_budget( @@ -818,7 +820,7 @@ pub mod tests { assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id); assert_eq!( skipped_compactions[0].reason, - "over memory budget. Needed budget = 15750, memory budget = 13500" + "over memory budget. Needed budget = 15100, memory budget = 14350" ); } }