chore: always use a fixed number of rows (8192) per batch to estimate memory (#5733)

pull/24376/head
Nga Tran 2022-09-23 11:51:25 -04:00 committed by GitHub
parent da697815ff
commit bb7df22aa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 44 deletions

View File

@ -18,7 +18,6 @@ use parquet_file::storage::ParquetStorage;
use schema::sort::SortKey;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
cmp::max,
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
@ -516,11 +515,9 @@ fn estimate_arrow_bytes_for_file(
let average_cardinality = row_count / AVERAGE_ROW_COUNT_CARDINALITY_RATIO;
// Since there is a minimum number of rows per batch, we will use that minimum number of rows if the row_count is smaller
let row_count = max(
row_count,
min_num_rows_allocated_per_record_batch_to_datafusion_plan as i64,
);
// Since DataFusion streams files and allocates a fixed (configurable) number of rows per batch,
// we always use that number to estimate the memory usage per batch.
let row_count_per_batch = min_num_rows_allocated_per_record_batch_to_datafusion_plan as i64;
// Bytes needed for number columns
let mut value_bytes = 0;
@ -531,17 +528,17 @@ fn estimate_arrow_bytes_for_file(
for c in columns {
match c.col_type {
ColumnType::I64 | ColumnType::U64 | ColumnType::F64 | ColumnType::Time => {
value_bytes += c.count * row_count * VALUE_BYTE;
value_bytes += c.count * row_count_per_batch * VALUE_BYTE;
}
ColumnType::String => {
string_bytes += row_count * STRING_LENGTH;
string_bytes += row_count_per_batch * STRING_LENGTH;
}
ColumnType::Bool => {
bool_bytes += row_count * BOOL_BYTE;
bool_bytes += row_count_per_batch * BOOL_BYTE;
}
ColumnType::Tag => {
dictionary_key_bytes += average_cardinality * AVERAGE_TAG_VALUE_LENGTH;
dictionary_value_bytes = row_count * DICTIONARY_BYTE;
dictionary_value_bytes = row_count_per_batch * DICTIONARY_BYTE;
}
}
}
@ -585,7 +582,9 @@ pub mod tests {
#[test]
fn test_estimate_arrow_bytes_for_file_small_row_count() {
let row_count = 11;
let min_num_rows_allocated_per_record_batch = 20; // > row_count and will be used
// Always use this config param to estimate memory usage for each batch
// no matter it is larger or smaller than row_count
let min_num_rows_allocated_per_record_batch = 20;
// Time, U64, I64, F64
let columns = vec![
@ -649,7 +648,9 @@ pub mod tests {
#[test]
fn test_estimate_arrow_bytes_for_file_large_row_count() {
let row_count = 11;
let min_num_rows_allocated_per_record_batch = 10; // < row_count and won't be used
// Always use this config param to estimate memory usage for each batch
// even if it is smaller than row_count
let min_num_rows_allocated_per_record_batch = 10;
// Time, U64, I64, F64
let columns = vec![
@ -663,7 +664,7 @@ pub mod tests {
min_num_rows_allocated_per_record_batch,
row_count,
);
assert_eq!(bytes, 880); // 11 * (1+2+3+4) * 8
assert_eq!(bytes, 800); // 10 * (1+2+3+4) * 8
// Tag
let columns = vec![ColumnTypeCount::new(ColumnType::Tag, 1)];
@ -672,7 +673,7 @@ pub mod tests {
min_num_rows_allocated_per_record_batch,
row_count,
);
assert_eq!(bytes, 1088); // 5 * 200 + 11 * 8
assert_eq!(bytes, 1080); // 5 * 200 + 10 * 8
// String
let columns = vec![ColumnTypeCount::new(ColumnType::String, 1)];
@ -681,7 +682,7 @@ pub mod tests {
min_num_rows_allocated_per_record_batch,
row_count,
);
assert_eq!(bytes, 11000); // 11 * 1000
assert_eq!(bytes, 10000); // 10 * 1000
// Bool
let columns = vec![ColumnTypeCount::new(ColumnType::Bool, 1)];
@ -690,7 +691,7 @@ pub mod tests {
min_num_rows_allocated_per_record_batch,
row_count,
);
assert_eq!(bytes, 11); // 11 * 1
assert_eq!(bytes, 10); // 10 * 1
// all types
let columns = vec![
@ -707,7 +708,7 @@ pub mod tests {
min_num_rows_allocated_per_record_batch,
row_count,
);
assert_eq!(bytes, 12979); // 880 + 1088 + 11000 + 11
assert_eq!(bytes, 11890); // 800 + 1080 + 10000 + 10
}
fn make_compactor_config() -> CompactorConfig {

View File

@ -524,6 +524,8 @@ pub mod tests {
}
fn make_compactor_config() -> CompactorConfig {
// All numbers in here are chosen carefully for many tests.
// Change them will break the tests
CompactorConfig {
max_desired_file_size_bytes: 100_000_000,
percentage_max_file_size: 90,
@ -531,8 +533,8 @@ pub mod tests {
max_number_partitions_per_shard: 100,
min_number_recent_ingested_files_per_partition: 1,
hot_multiple: 4,
memory_budget_bytes: 12 * 1125, // 13,500 bytes
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1,
memory_budget_bytes: 14 * 1025, // 14,350
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 2,
max_num_compacting_files: 20,
}
}
@ -552,8 +554,8 @@ pub mod tests {
let shard = namespace.create_shard(1).await;
// Create a scenario of a table of 5 columns: tag, time, field int, field string, field
// bool. Thus, each file will have estimated memory bytes = 1125 * row_count (for even
// row_counts).
// bool. Thus, given min_num_rows_allocated_per_record_batch_to_datafusion_plan = 2,
// each file will have estimated memory bytes = 2050 + 100 * row_count (for even row_counts)
let table = namespace.create_table("test_table").await;
table.create_column("tag", ColumnType::Tag).await;
@ -605,8 +607,8 @@ pub mod tests {
let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1);
// P1:
// L0 2 rows. bytes: 1125 * 2 = 2,250
// L1 2 rows. bytes: 1125 * 2 = 2,250
// L0 2 rows. bytes: 2050 + 100*2 = 2,250
// L1 2 rows. bytes: 2050 + 100*2 = 2,250
// total = 2,250 + 2,250 = 4,500
let partition1 = table.with_shard(&shard).create_partition("one").await;
@ -627,8 +629,8 @@ pub mod tests {
partition1.create_parquet_file_catalog_record(pf1_2).await;
// P2:
// L0 2 rows. bytes: 1125 * 2 = 2,250
// L1 2 rows. bytes: 1125 * 2 = 2,250
// L0 2 rows. bytes: 2050 + 100*2 = 2,250
// L1 2 rows. bytes: 2050 + 100*2 = 2,250
// total = 2,250 + 2,250 = 4,500
let partition2 = table.with_shard(&shard).create_partition("two").await;
@ -648,15 +650,15 @@ pub mod tests {
.with_creation_time(hot_time_one_hour_ago);
partition2.create_parquet_file_catalog_record(pf2_2).await;
// P3: bytes >= 90% of full budget = 90% * 13,500 = 12,150
// L0 6 rows. bytes: 1125 * 6 = 6,750
// L1 4 rows. bytes: 1125 * 4 = 4,500
// total = 6,700 + 4,500 = 12,150
// P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915
// L0 40 rows. bytes: 2050 + 100 * 50 = 7,050
// L1 24 rows. bytes: 2050 + 100 * 40 = 6,050
// total = 7,050 + 6,050 = 13,100
let partition3 = table.with_shard(&shard).create_partition("three").await;
let pf3_1 = TestParquetFileBuilder::default()
.with_min_time(1)
.with_max_time(5)
.with_row_count(6)
.with_row_count(40)
.with_compaction_level(CompactionLevel::Initial)
.with_creation_time(hot_time_one_hour_ago);
partition3.create_parquet_file_catalog_record(pf3_1).await;
@ -664,20 +666,20 @@ pub mod tests {
let pf3_2 = TestParquetFileBuilder::default()
.with_min_time(4) // overlapped with pf3_1
.with_max_time(6)
.with_row_count(4)
.with_row_count(24)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_creation_time(hot_time_one_hour_ago);
partition3.create_parquet_file_catalog_record(pf3_2).await;
// P4: Over the full budget
// L0 with 8 rows.bytes = 1125 * 8 = 9,000
// L1 with 6 rows.bytes = 1125 * 6 = 6,750
// total = 15,750
// L0 with 70 rows.bytes = 2050 + 100 * 70 = 9,050
// L1 with 40 rows.bytes = 2050 + 100 * 40 = 6,050
// total = 16,000
let partition4 = table.with_shard(&shard).create_partition("four").await;
let pf4_1 = TestParquetFileBuilder::default()
.with_min_time(1)
.with_max_time(5)
.with_row_count(8)
.with_row_count(70)
.with_compaction_level(CompactionLevel::Initial)
.with_creation_time(hot_time_one_hour_ago);
partition4.create_parquet_file_catalog_record(pf4_1).await;
@ -685,15 +687,15 @@ pub mod tests {
let pf4_2 = TestParquetFileBuilder::default()
.with_min_time(4) // overlapped with pf4_1
.with_max_time(6)
.with_row_count(6)
.with_row_count(40)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_creation_time(hot_time_one_hour_ago);
partition4.create_parquet_file_catalog_record(pf4_2).await;
// P5:
// L0 with 2 rows.bytes = 1125 * 2 = 2,250
// L1 with 2 rows.bytes = 1125 * 2 = 2,250
// total = 4,500
// L0 2 rows. bytes: 2050 + 100*2 = 2,250
// L1 2 rows. bytes: 2050 + 100*2 = 2,250
// total = 2,250 + 2,250 = 4,500
let partition5 = table.with_shard(&shard).create_partition("five").await;
let pf5_1 = TestParquetFileBuilder::default()
.with_min_time(1)
@ -712,9 +714,9 @@ pub mod tests {
partition5.create_parquet_file_catalog_record(pf5_2).await;
// P6:
// L0 with 2 rows.bytes = 1125 * 2 = 2,250
// L1 with 2 rows.bytes = 1125 * 2 = 2,250
// total = 4,500
// L0 2 rows. bytes: 2050 + 100*2 = 2,250
// L1 2 rows. bytes: 2050 + 100*2 = 2,250
// total = 2,250 + 2,250 = 4,500
let partition6 = table.with_shard(&shard).create_partition("six").await;
let pf6_1 = TestParquetFileBuilder::default()
.with_min_time(1)
@ -751,7 +753,7 @@ pub mod tests {
//
// * Round 1: 3 candidates [P1, P2, P5] and total needed budget 13,500
// * Round 2: 1 candidate [P6] and total needed budget 4,500
// * Round 3: 1 candidate [P3] and total needed budget 11,250
// * Round 3: 1 candidate [P3] and total needed budget 13,100
//
// P4 is not compacted due to overbudget.
compact_candidates_with_memory_budget(
@ -818,7 +820,7 @@ pub mod tests {
assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id);
assert_eq!(
skipped_compactions[0].reason,
"over memory budget. Needed budget = 15750, memory budget = 13500"
"over memory budget. Needed budget = 15100, memory budget = 14350"
);
}
}