diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 1386d1c4e2..6658697cc3 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -491,12 +491,10 @@ impl PartitionCompactionCandidateWithInfo { pub fn estimated_arrow_bytes( &self, min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64, - row_count: i64, ) -> u64 { estimate_arrow_bytes_for_file( &self.column_type_counts, min_num_rows_allocated_per_record_batch_to_datafusion_plan, - row_count, ) } } @@ -504,7 +502,6 @@ impl PartitionCompactionCandidateWithInfo { fn estimate_arrow_bytes_for_file( columns: &[ColumnTypeCount], min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64, - row_count: i64, ) -> u64 { const AVERAGE_TAG_VALUE_LENGTH: i64 = 200; const STRING_LENGTH: i64 = 1000; @@ -513,12 +510,12 @@ fn estimate_arrow_bytes_for_file( const BOOL_BYTE: i64 = 1; const AVERAGE_ROW_COUNT_CARDINALITY_RATIO: i64 = 2; - let average_cardinality = row_count / AVERAGE_ROW_COUNT_CARDINALITY_RATIO; - // Since DataFusion streams files and allocates a fixed (configurable) number of rows per batch, // we always use that number to estimate the memory usage per batch. let row_count_per_batch = min_num_rows_allocated_per_record_batch_to_datafusion_plan as i64; + let average_cardinality = row_count_per_batch / AVERAGE_ROW_COUNT_CARDINALITY_RATIO; + // Bytes needed for number columns let mut value_bytes = 0; let mut string_bytes = 0; @@ -582,7 +579,6 @@ pub mod tests { #[test] fn test_estimate_arrow_bytes_for_file_small_row_count() { - let row_count = 11; // Always use this config param to estimate memory usage for each batch // no matter it is larger or smaller than row_count let min_num_rows_allocated_per_record_batch = 20; @@ -594,38 +590,26 @@ pub mod tests { ColumnTypeCount::new(ColumnType::F64, 3), ColumnTypeCount::new(ColumnType::I64, 4), ]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 1600); // 20 * (1+2+3+4) * 8 // Tag let columns = vec![ColumnTypeCount::new(ColumnType::Tag, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); - assert_eq!(bytes, 1160); // 5 * 200 + 20 * 8 + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); + assert_eq!(bytes, 2160); // 10 * 200 + 20 * 8 // String let columns = vec![ColumnTypeCount::new(ColumnType::String, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 20000); // 20 * 1000 // Bool let columns = vec![ColumnTypeCount::new(ColumnType::Bool, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 20); // 20 * 1 // all types @@ -638,17 +622,13 @@ pub mod tests { ColumnTypeCount::new(ColumnType::String, 1), ColumnTypeCount::new(ColumnType::Bool, 1), ]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); - assert_eq!(bytes, 22780); // 1600 + 1160 + 20000 + 20 + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); + assert_eq!(bytes, 23780); // 1600 + 2160 + 20000 + 20 } #[test] fn test_estimate_arrow_bytes_for_file_large_row_count() { - let row_count = 11; // Always use this config param to estimate memory usage for each batch // even if it is smaller than row_count let min_num_rows_allocated_per_record_batch = 10; @@ -660,38 +640,26 @@ pub mod tests { ColumnTypeCount::new(ColumnType::F64, 3), ColumnTypeCount::new(ColumnType::I64, 4), ]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 800); // 10 * (1+2+3+4) * 8 // Tag let columns = vec![ColumnTypeCount::new(ColumnType::Tag, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 1080); // 5 * 200 + 10 * 8 // String let columns = vec![ColumnTypeCount::new(ColumnType::String, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 10000); // 10 * 1000 // Bool let columns = vec![ColumnTypeCount::new(ColumnType::Bool, 1)]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 10); // 10 * 1 // all types @@ -704,11 +672,8 @@ pub mod tests { ColumnTypeCount::new(ColumnType::String, 1), ColumnTypeCount::new(ColumnType::Bool, 1), ]; - let bytes = estimate_arrow_bytes_for_file( - &columns, - min_num_rows_allocated_per_record_batch, - row_count, - ); + let bytes = + estimate_arrow_bytes_for_file(&columns, min_num_rows_allocated_per_record_batch); assert_eq!(bytes, 11890); // 800 + 1080 + 10000 + 10 } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 6f86700500..78db326055 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -275,7 +275,7 @@ impl Drop for CompactorHandlerImpl { #[cfg(test)] mod tests { use super::*; - use crate::tests::{test_setup, TestSetup}; + use crate::tests::{test_setup_with_default_budget, TestSetup}; #[tokio::test] async fn list_skipped_compactions() { @@ -284,7 +284,7 @@ mod tests { table, shard, .. - } = test_setup().await; + } = test_setup_with_default_budget().await; let compactor_handler = CompactorHandlerImpl::new(Arc::clone(&compactor)); @@ -318,7 +318,7 @@ mod tests { table, shard, .. - } = test_setup().await; + } = test_setup_with_default_budget().await; let compactor_handler = CompactorHandlerImpl::new(Arc::clone(&compactor)); diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 7e072853a9..ffd10b8145 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -485,7 +485,7 @@ pub mod tests { compactor, mock_compactor, .. - } = test_setup().await; + } = test_setup(14350).await; let sorted_candidates = VecDeque::new(); @@ -548,7 +548,7 @@ pub mod tests { } } - fn make_compactor_config() -> CompactorConfig { + fn make_compactor_config(budget: u64) -> CompactorConfig { // All numbers in here are chosen carefully for many tests. // Change them will break the tests CompactorConfig { @@ -558,7 +558,7 @@ pub mod tests { max_number_partitions_per_shard: 100, min_number_recent_ingested_files_per_partition: 1, hot_multiple: 4, - memory_budget_bytes: 14 * 1025, // 14,350 + memory_budget_bytes: budget, min_num_rows_allocated_per_record_batch_to_datafusion_plan: 2, max_num_compacting_files: 20, } @@ -571,7 +571,11 @@ pub mod tests { pub(crate) table: Arc, } - pub(crate) async fn test_setup() -> TestSetup { + pub(crate) async fn test_setup_with_default_budget() -> TestSetup { + test_setup(14350).await + } + + pub(crate) async fn test_setup(budget: u64) -> TestSetup { let catalog = TestCatalog::new(); let namespace = catalog .create_namespace("namespace_hot_partitions_to_compact") @@ -580,7 +584,8 @@ pub mod tests { // Create a scenario of a table of 5 columns: tag, time, field int, field string, field // bool. Thus, given min_num_rows_allocated_per_record_batch_to_datafusion_plan = 2, - // each file will have estimated memory bytes = 2050 + 100 * row_count (for even row_counts) + //// todo each file will have estimated memory bytes = 2050 + 100 * row_count (for even row_counts) + // each file will have estimated memory bytes = 2,250 let table = namespace.create_table("test_table").await; table.create_column("tag", ColumnType::Tag).await; @@ -592,9 +597,8 @@ pub mod tests { table.create_column("field_bool", ColumnType::Bool).await; // Create a compactor - // Compactor budget : 13,500 let time_provider = Arc::new(SystemProvider::new()); - let config = make_compactor_config(); + let config = make_compactor_config(budget); let compactor = Arc::new(Compactor::new( vec![shard.shard.id], Arc::clone(&catalog.catalog), @@ -626,17 +630,18 @@ pub mod tests { shard, table, .. - } = test_setup().await; + } = test_setup(14350).await; // Some times in the past to set to created_at of the files let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1); // P1: - // L0 2 rows. bytes: 2050 + 100*2 = 2,250 - // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 // total = 2,250 + 2,250 = 4,500 let partition1 = table.with_shard(&shard).create_partition("one").await; + // 2 files with IDs 1 and 2 let pf1_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) @@ -654,11 +659,12 @@ pub mod tests { partition1.create_parquet_file_catalog_record(pf1_2).await; // P2: - // L0 2 rows. bytes: 2050 + 100*2 = 2,250 - // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 // total = 2,250 + 2,250 = 4,500 let partition2 = table.with_shard(&shard).create_partition("two").await; + // 2 files with IDs 3 and 4 let pf2_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) @@ -676,52 +682,63 @@ pub mod tests { partition2.create_parquet_file_catalog_record(pf2_2).await; // P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915 - // L0 40 rows. bytes: 2050 + 100 * 50 = 7,050 - // L1 24 rows. bytes: 2050 + 100 * 40 = 6,050 - // total = 7,050 + 6,050 = 13,100 + // L0 40 rows. bytes: 2,250 + // Five L1s. bytes: 2,250 each + // total = 2,250 * 6 = 13,500 let partition3 = table.with_shard(&shard).create_partition("three").await; + + // 6 files with IDs 5, 6, 7, 8, 9, 10 let pf3_1 = TestParquetFileBuilder::default() .with_min_time(1) - .with_max_time(5) + .with_max_time(6) .with_row_count(40) .with_compaction_level(CompactionLevel::Initial) .with_creation_time(hot_time_one_hour_ago); partition3.create_parquet_file_catalog_record(pf3_1).await; - let pf3_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf3_1 - .with_max_time(6) - .with_row_count(24) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition3.create_parquet_file_catalog_record(pf3_2).await; + // Five overlapped L1 files + for i in 1..6 { + let pf3_i = TestParquetFileBuilder::default() + .with_min_time(i) // overlapped with pf3_1 + .with_max_time(i) + .with_row_count(24) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition3.create_parquet_file_catalog_record(pf3_i).await; + } // P4: Over the full budget - // L0 with 70 rows.bytes = 2050 + 100 * 70 = 9,050 - // L1 with 40 rows.bytes = 2050 + 100 * 40 = 6,050 - // total = 16,000 + // L0 40 rows. bytes: 2,250 + // Six L1s. bytes: 2,250 each + // total = 2,250 * 7 = 15,750 > 14350 let partition4 = table.with_shard(&shard).create_partition("four").await; + + // 7 files with IDs 11, 12, 13, 14, 15, 16, 17 let pf4_1 = TestParquetFileBuilder::default() .with_min_time(1) - .with_max_time(5) + .with_max_time(7) .with_row_count(70) .with_compaction_level(CompactionLevel::Initial) .with_creation_time(hot_time_one_hour_ago); partition4.create_parquet_file_catalog_record(pf4_1).await; - let pf4_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf4_1 - .with_max_time(6) - .with_row_count(40) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition4.create_parquet_file_catalog_record(pf4_2).await; + // Six overlapped L1 files + for i in 1..7 { + let pf4_i = TestParquetFileBuilder::default() + .with_min_time(i) // overlapped with pf4_1 + .with_max_time(i) + .with_row_count(40) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition4.create_parquet_file_catalog_record(pf4_i).await; + } // P5: - // L0 2 rows. bytes: 2050 + 100*2 = 2,250 - // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 // total = 2,250 + 2,250 = 4,500 let partition5 = table.with_shard(&shard).create_partition("five").await; + // 2 files with IDs 18, 19 let pf5_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) @@ -739,10 +756,11 @@ pub mod tests { partition5.create_parquet_file_catalog_record(pf5_2).await; // P6: - // L0 2 rows. bytes: 2050 + 100*2 = 2,250 - // L1 2 rows. bytes: 2050 + 100*2 = 2,250 + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 // total = 2,250 + 2,250 = 4,500 let partition6 = table.with_shard(&shard).create_partition("six").await; + // 2 files with IDs 20, 21 let pf6_1 = TestParquetFileBuilder::default() .with_min_time(1) .with_max_time(5) @@ -816,7 +834,7 @@ pub mod tests { assert_eq!(g1_candidate3.partition.id(), partition5.partition.id); let g1_candidate3_pf_ids: Vec<_> = g1_candidate3.files.iter().map(|pf| pf.id().get()).collect(); - assert_eq!(g1_candidate3_pf_ids, vec![10, 9]); + assert_eq!(g1_candidate3_pf_ids, vec![19, 18]); // Round 2 let group2 = &compaction_groups[1]; @@ -826,7 +844,7 @@ pub mod tests { assert_eq!(g2_candidate1.partition.id(), partition6.partition.id); let g2_candidate1_pf_ids: Vec<_> = g2_candidate1.files.iter().map(|pf| pf.id().get()).collect(); - assert_eq!(g2_candidate1_pf_ids, vec![12, 11]); + assert_eq!(g2_candidate1_pf_ids, vec![21, 20]); // Round 3 let group3 = &compaction_groups[2]; @@ -836,7 +854,8 @@ pub mod tests { assert_eq!(g3_candidate1.partition.id(), partition3.partition.id); let g3_candidate1_pf_ids: Vec<_> = g3_candidate1.files.iter().map(|pf| pf.id().get()).collect(); - assert_eq!(g3_candidate1_pf_ids, vec![6, 5]); + // all IDs of level-1 firts then level-0 + assert_eq!(g3_candidate1_pf_ids, vec![6, 7, 8, 9, 10, 5]); { let mut repos = compactor.catalog.repositories().await; diff --git a/compactor/src/parquet_file_lookup.rs b/compactor/src/parquet_file_lookup.rs index f08fa2354a..9010b9e17b 100644 --- a/compactor/src/parquet_file_lookup.rs +++ b/compactor/src/parquet_file_lookup.rs @@ -89,10 +89,8 @@ impl ParquetFilesForCompaction { for parquet_file in parquet_files { // Estimate the bytes DataFusion needs when scan this file - let estimated_arrow_bytes = partition.estimated_arrow_bytes( - min_num_rows_allocated_per_record_batch_to_datafusion_plan, - parquet_file.row_count, - ); + let estimated_arrow_bytes = partition + .estimated_arrow_bytes(min_num_rows_allocated_per_record_batch_to_datafusion_plan); // Estimated bytes to store this file in memory let estimated_bytes_to_store_in_memory = 2 * parquet_file.file_size_bytes as u64; let parquet_file = match size_overrides.get(&parquet_file.id) {