From c21ac9050b298fd6944666bce4e86bb9c2e3cf86 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 31 Aug 2022 13:58:38 -0400 Subject: [PATCH 1/4] refactor: Extract a test util fn that will only create parquet file catalog records --- iox_tests/src/util.rs | 88 +++++++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 19 deletions(-) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index b288a349d0..310251e5fa 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -470,7 +470,8 @@ impl TestPartition { }) } - /// Create a Parquet file in this partition with attributes specified by the builder + /// Create a Parquet file in this partition in object storage and the catalog with attributes + /// specified by the builder pub async fn create_parquet_file( self: &Arc, builder: TestParquetFileBuilder, @@ -479,13 +480,14 @@ impl TestPartition { record_batch, table, schema, - max_seq, + max_sequence_number, min_time, max_time, file_size_bytes, creation_time, compaction_level, to_delete, + object_store_id, } = builder; let record_batch = record_batch.expect("A record batch is required"); @@ -499,11 +501,9 @@ impl TestPartition { let row_count = record_batch.num_rows(); assert!(row_count > 0, "Parquet file must have at least 1 row"); - let (record_batch, sort_key) = sort_batch(record_batch, schema); + let (record_batch, sort_key) = sort_batch(record_batch, schema.clone()); let record_batch = dedup_batch(record_batch, &sort_key); - let object_store_id = Uuid::new_v4(); - let max_sequence_number = SequenceNumber::new(max_seq); let metadata = IoxMetadata { object_store_id, creation_timestamp: now(), @@ -518,21 +518,70 @@ impl TestPartition { compaction_level: CompactionLevel::Initial, sort_key: Some(sort_key.clone()), }; - let table_catalog_schema = self.table.catalog_schema().await; - let column_set = ColumnSet::new(record_batch.schema().fields().iter().map(|f| { - table_catalog_schema - .columns - .get(f.name()) - .expect("Column registered") - .id - })); let real_file_size_bytes = create_parquet_file( ParquetStorage::new(Arc::clone(&self.catalog.object_store)), &metadata, - record_batch, + record_batch.clone(), ) .await; + let builder = TestParquetFileBuilder { + record_batch: Some(record_batch), + table: Some(table), + schema: Some(schema), + max_sequence_number, + min_time, + max_time, + file_size_bytes: Some(file_size_bytes.unwrap_or(real_file_size_bytes as u64)), + creation_time, + compaction_level, + to_delete, + object_store_id, + }; + + let result = self.create_parquet_file_catalog_record(builder).await; + let mut repos = self.catalog.catalog.repositories().await; + update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await; + result + } + + /// Only update the catalog with the builder's info, don't create anything in object storage. + /// Record batch is not required in this case. + pub async fn create_parquet_file_catalog_record( + self: &Arc, + builder: TestParquetFileBuilder, + ) -> TestParquetFile { + let TestParquetFileBuilder { + record_batch, + max_sequence_number, + min_time, + max_time, + file_size_bytes, + creation_time, + compaction_level, + to_delete, + object_store_id, + .. + } = builder; + + let table_catalog_schema = self.table.catalog_schema().await; + + let (row_count, column_set) = if let Some(record_batch) = record_batch { + let column_set = ColumnSet::new(record_batch.schema().fields().iter().map(|f| { + table_catalog_schema + .columns + .get(f.name()) + .expect("Column registered") + .id + })); + + (record_batch.num_rows(), column_set) + } else { + let column_set = + ColumnSet::new(table_catalog_schema.columns.values().map(|col| col.id)); + (0, column_set) + }; + let parquet_file_params = ParquetFileParams { shard_id: self.shard.shard.id, namespace_id: self.namespace.namespace.id, @@ -542,7 +591,7 @@ impl TestPartition { max_sequence_number, min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), - file_size_bytes: file_size_bytes.unwrap_or(real_file_size_bytes as u64) as i64, + file_size_bytes: file_size_bytes.unwrap_or(0) as i64, row_count: row_count as i64, created_at: Timestamp::new(creation_time), compaction_level, @@ -555,7 +604,6 @@ impl TestPartition { .create(parquet_file_params) .await .unwrap(); - update_catalog_sort_key_if_needed(repos.partitions(), self.partition.id, sort_key).await; if to_delete { repos @@ -582,13 +630,14 @@ pub struct TestParquetFileBuilder { record_batch: Option, table: Option, schema: Option, - max_seq: i64, + max_sequence_number: SequenceNumber, min_time: i64, max_time: i64, file_size_bytes: Option, creation_time: i64, compaction_level: CompactionLevel, to_delete: bool, + object_store_id: Uuid, } impl Default for TestParquetFileBuilder { @@ -597,13 +646,14 @@ impl Default for TestParquetFileBuilder { record_batch: None, table: None, schema: None, - max_seq: 100, + max_sequence_number: SequenceNumber::new(100), min_time: now().timestamp_nanos(), max_time: now().timestamp_nanos(), file_size_bytes: None, creation_time: 1, compaction_level: CompactionLevel::Initial, to_delete: false, + object_store_id: Uuid::new_v4(), } } } @@ -638,7 +688,7 @@ impl TestParquetFileBuilder { /// Specify the maximum sequence number for the parquet file metadata. pub fn with_max_seq(mut self, max_seq: i64) -> Self { - self.max_seq = max_seq; + self.max_sequence_number = SequenceNumber::new(max_seq); self } From a9d664d0bf4bed7d111944ad5e0b54a38ae8eb8a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 31 Aug 2022 14:36:40 -0400 Subject: [PATCH 2/4] feat: Add a way to set the row count on Parquet file catalog entries And only allow setting this when no record batch or line protocol is specified so that there isn't a way to create a parquet file with data that has a mismatched row count. --- iox_tests/src/util.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 310251e5fa..075996e917 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -488,6 +488,7 @@ impl TestPartition { compaction_level, to_delete, object_store_id, + row_count, } = builder; let record_batch = record_batch.expect("A record batch is required"); @@ -499,6 +500,10 @@ impl TestPartition { "Table name of line protocol and partition should have matched", ); + assert!( + row_count.is_none(), + "Cannot have both a record batch and a manually set row_count!" + ); let row_count = record_batch.num_rows(); assert!(row_count > 0, "Parquet file must have at least 1 row"); let (record_batch, sort_key) = sort_batch(record_batch, schema.clone()); @@ -537,6 +542,7 @@ impl TestPartition { compaction_level, to_delete, object_store_id, + row_count: None, // will be computed from the record batch again }; let result = self.create_parquet_file_catalog_record(builder).await; @@ -561,6 +567,7 @@ impl TestPartition { compaction_level, to_delete, object_store_id, + row_count, .. } = builder; @@ -575,11 +582,16 @@ impl TestPartition { .id })); + assert!( + row_count.is_none(), + "Cannot have both a record batch and a manually set row_count!" + ); + (record_batch.num_rows(), column_set) } else { let column_set = ColumnSet::new(table_catalog_schema.columns.values().map(|col| col.id)); - (0, column_set) + (row_count.unwrap_or(0), column_set) }; let parquet_file_params = ParquetFileParams { @@ -638,6 +650,7 @@ pub struct TestParquetFileBuilder { compaction_level: CompactionLevel, to_delete: bool, object_store_id: Uuid, + row_count: Option, } impl Default for TestParquetFileBuilder { @@ -654,6 +667,7 @@ impl Default for TestParquetFileBuilder { compaction_level: CompactionLevel::Initial, to_delete: false, object_store_id: Uuid::new_v4(), + row_count: None, } } } @@ -727,6 +741,13 @@ impl TestParquetFileBuilder { self.to_delete = to_delete; self } + + /// Specify the number of rows in this parquet file. If line protocol/record batch are also + /// set, this will panic! Only use this when you're not specifying any rows! + pub fn with_row_count(mut self, row_count: usize) -> Self { + self.row_count = Some(row_count); + self + } } async fn update_catalog_sort_key_if_needed( From b0e871196c9f52e795ad22d9e15e24079b61401e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 31 Aug 2022 14:37:59 -0400 Subject: [PATCH 3/4] test: Use more iox test utils in this compactor test --- compactor/src/compact_hot_partitions.rs | 439 +++++++++--------------- 1 file changed, 162 insertions(+), 277 deletions(-) diff --git a/compactor/src/compact_hot_partitions.rs b/compactor/src/compact_hot_partitions.rs index 61bd2f5eff..ad610cbd0c 100644 --- a/compactor/src/compact_hot_partitions.rs +++ b/compactor/src/compact_hot_partitions.rs @@ -305,318 +305,155 @@ mod tests { handler::CompactorConfig, }; use backoff::BackoffConfig; - use data_types::{ - ColumnId, ColumnSet, ColumnType, ColumnTypeCount, CompactionLevel, ParquetFileParams, - SequenceNumber, ShardIndex, Timestamp, - }; + use data_types::{ColumnType, ColumnTypeCount, CompactionLevel}; use iox_query::exec::Executor; - use iox_tests::util::TestCatalog; + use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable}; use iox_time::SystemProvider; use parquet_file::storage::ParquetStorage; use std::{collections::VecDeque, sync::Arc, time::Duration}; - use uuid::Uuid; #[tokio::test] async fn test_compact_hot_partition_candidates() { test_helpers::maybe_start_logging(); - let catalog = TestCatalog::new(); - - // Test setup - // Create a scenario of a table of 5 columns: tag, time, field int, field string, field bool. - // Thus, each file will have estimated memory buytes = 1125 * row_count (for even row_counts) - // The table has n partitions. Each partition has one L0 file and one overlapped L1 file. - let mut txn = catalog.catalog.start_transaction().await.unwrap(); - - let topic = txn.topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn - .namespaces() - .create( - "namespace_hot_partitions_to_compact", - "inf", - topic.id, - pool.id, - ) - .await - .unwrap(); - - let table = txn - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - - let _col1 = txn - .columns() - .create_or_get("tag", table.id, ColumnType::Tag) - .await - .unwrap(); - let _col2 = txn - .columns() - .create_or_get("time", table.id, ColumnType::Time) - .await - .unwrap(); - let _col3 = txn - .columns() - .create_or_get("field_int", table.id, ColumnType::I64) - .await - .unwrap(); - let _col4 = txn - .columns() - .create_or_get("field_string", table.id, ColumnType::String) - .await - .unwrap(); - let _col5 = txn - .columns() - .create_or_get("field_bool", table.id, ColumnType::Bool) - .await - .unwrap(); - - let shard = txn - .shards() - .create_or_get(&topic, ShardIndex::new(1)) - .await - .unwrap(); - - // Create a compactor - // Compactor budget : 13,500 - let time_provider = Arc::new(SystemProvider::new()); - let config = make_compactor_config(); - let compactor = Compactor::new( - vec![shard.id], - Arc::clone(&catalog.catalog), - ParquetStorage::new(Arc::clone(&catalog.object_store)), - Arc::new(Executor::new(1)), - time_provider, - BackoffConfig::default(), - config, - Arc::new(metric::Registry::new()), - ); + let TestSetup { + compactor, + shard, + table, + .. + } = test_setup().await; // Some times in the past to set to created_at of the files - let time_one_hour_ago = Timestamp::new( - (compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(), - ); + let hot_time_one_hour_ago = + (compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(); // P1: // L0 2 rows. bytes: 1125 * 2 = 2,250 // L1 2 rows. bytes: 1125 * 2 = 2,250 // total = 2,250 + 2,250 = 4,500 - let partition1 = txn - .partitions() - .create_or_get("one".into(), shard.id, table.id) - .await - .unwrap(); - // Basic parquet info - let param = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: table.id, - partition_id: partition1.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(100), - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - file_size_bytes: 1000, // not matter - row_count: 2, - compaction_level: CompactionLevel::Initial, // L0 - created_at: time_one_hour_ago, // Hot partition - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - }; - let _pf1_1 = txn.parquet_files().create(param.clone()).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition1.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 2, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf1_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition1.id, &["tag", "time"]) - .await - .unwrap(); + let partition1 = table.with_shard(&shard).create_partition("one").await; + + let pf1_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition1.create_parquet_file_catalog_record(pf1_1).await; + + let pf1_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf1_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition1.create_parquet_file_catalog_record(pf1_2).await; // P2: // L0 2 rows. bytes: 1125 * 2 = 2,250 // L1 2 rows. bytes: 1125 * 2 = 2,250 // total = 2,250 + 2,250 = 4,500 - let partition2 = txn - .partitions() - .create_or_get("two".into(), shard.id, table.id) - .await - .unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition2.id, - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - row_count: 2, - compaction_level: CompactionLevel::Initial, // L0 - ..param.clone() - }; - let _pf2_1 = txn.parquet_files().create(paramf).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition2.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 2, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf2_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition2.id, &["tag", "time"]) - .await - .unwrap(); + let partition2 = table.with_shard(&shard).create_partition("two").await; + + let pf2_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition2.create_parquet_file_catalog_record(pf2_1).await; + + let pf2_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf2_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition2.create_parquet_file_catalog_record(pf2_2).await; // P3: bytes >= 90% of full budget = 90% * 13,500 = 12,150 // L0 6 rows. bytes: 1125 * 6 = 6,750 // L1 4 rows. bytes: 1125 * 4 = 4,500 // total = 6,700 + 4,500 = 12,150 - let partition3 = txn - .partitions() - .create_or_get("three".into(), shard.id, table.id) - .await - .unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition3.id, - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - row_count: 6, - compaction_level: CompactionLevel::Initial, // L0 - ..param.clone() - }; - let _pf3_1 = txn.parquet_files().create(paramf).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition3.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 4, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf3_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition3.id, &["tag", "time"]) - .await - .unwrap(); + let partition3 = table.with_shard(&shard).create_partition("three").await; + let pf3_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(6) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition3.create_parquet_file_catalog_record(pf3_1).await; + + let pf3_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf3_1 + .with_max_time(6) + .with_row_count(4) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition3.create_parquet_file_catalog_record(pf3_2).await; // P4: Over the full budget // L0 with 8 rows.bytes = 1125 * 8 = 9,000 // L1 with 6 rows.bytes = 1125 * 6 = 6,750 // total = 15,750 - let partition4 = txn - .partitions() - .create_or_get("four".into(), shard.id, table.id) - .await - .unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition4.id, - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - row_count: 8, - compaction_level: CompactionLevel::Initial, // L0 - ..param.clone() - }; - let _pf4_1 = txn.parquet_files().create(paramf).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition4.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 6, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf4_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition4.id, &["tag", "time"]) - .await - .unwrap(); + let partition4 = table.with_shard(&shard).create_partition("four").await; + let pf4_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(8) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition4.create_parquet_file_catalog_record(pf4_1).await; + + let pf4_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf4_1 + .with_max_time(6) + .with_row_count(6) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition4.create_parquet_file_catalog_record(pf4_2).await; // P5: // L0 with 2 rows.bytes = 1125 * 2 = 2,250 // L1 with 2 rows.bytes = 1125 * 2 = 2,250 // total = 4,500 - let partition5 = txn - .partitions() - .create_or_get("five".into(), shard.id, table.id) - .await - .unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition5.id, - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - row_count: 2, - compaction_level: CompactionLevel::Initial, // L0 - ..param.clone() - }; - let _pf5_1 = txn.parquet_files().create(paramf).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition5.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 2, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf5_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition5.id, &["tag", "time"]) - .await - .unwrap(); + let partition5 = table.with_shard(&shard).create_partition("five").await; + let pf5_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition5.create_parquet_file_catalog_record(pf5_1).await; + + let pf5_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf5_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition5.create_parquet_file_catalog_record(pf5_2).await; // P6: // L0 with 2 rows.bytes = 1125 * 2 = 2,250 // L1 with 2 rows.bytes = 1125 * 2 = 2,250 // total = 4,500 - let partition6 = txn - .partitions() - .create_or_get("six".into(), shard.id, table.id) - .await - .unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition6.id, - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - row_count: 2, - compaction_level: CompactionLevel::Initial, // L0 - ..param.clone() - }; - let _pf6_1 = txn.parquet_files().create(paramf).await.unwrap(); - let paramf = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition6.id, - min_time: Timestamp::new(4), // overlapped - max_time: Timestamp::new(6), - row_count: 2, - compaction_level: CompactionLevel::FileNonOverlapped, // L1 - ..param.clone() - }; - let _pf6_2 = txn.parquet_files().create(paramf).await.unwrap(); - // update sort key - txn.partitions() - .update_sort_key(partition6.id, &["tag", "time"]) - .await - .unwrap(); + let partition6 = table.with_shard(&shard).create_partition("six").await; + let pf6_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition6.create_parquet_file_catalog_record(pf6_1).await; - txn.commit().await.unwrap(); + let pf6_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf6_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition6.create_parquet_file_catalog_record(pf6_2).await; // partition candidates: partitions with L0 and overlapped L1 let candidates = compactor @@ -633,7 +470,7 @@ mod tests { // column types of the partitions let table_columns = compactor.table_columns(&candidates).await.unwrap(); assert_eq!(table_columns.len(), 1); - let mut cols = table_columns.get(&table.id).unwrap().clone(); + let mut cols = table_columns.get(&table.table.id).unwrap().clone(); assert_eq!(cols.len(), 5); cols.sort_by_key(|c| c.col_type); let mut expected_cols = vec![ @@ -679,7 +516,7 @@ mod tests { // https://github.com/influxdata/conductor/issues/1130 // "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema" compact_hot_partition_candidates( - Arc::new(compactor), + Arc::clone(&compactor), mock_compaction, sorted_candidates, table_columns, @@ -697,21 +534,21 @@ mod tests { let g1_candidate1 = &group1[0]; assert_eq!(g1_candidate1.budget_bytes(), 4500); - assert_eq!(g1_candidate1.partition.id(), partition1.id); + assert_eq!(g1_candidate1.partition.id(), partition1.partition.id); let g1_candidate1_pf_ids: Vec<_> = g1_candidate1.files.iter().map(|pf| pf.id.get()).collect(); assert_eq!(g1_candidate1_pf_ids, vec![2, 1]); let g1_candidate2 = &group1[1]; assert_eq!(g1_candidate2.budget_bytes(), 4500); - assert_eq!(g1_candidate2.partition.id(), partition2.id); + assert_eq!(g1_candidate2.partition.id(), partition2.partition.id); let g1_candidate2_pf_ids: Vec<_> = g1_candidate2.files.iter().map(|pf| pf.id.get()).collect(); assert_eq!(g1_candidate2_pf_ids, vec![4, 3]); let g1_candidate3 = &group1[2]; assert_eq!(g1_candidate3.budget_bytes(), 4500); - assert_eq!(g1_candidate3.partition.id(), partition5.id); + assert_eq!(g1_candidate3.partition.id(), partition5.partition.id); let g1_candidate3_pf_ids: Vec<_> = g1_candidate3.files.iter().map(|pf| pf.id.get()).collect(); assert_eq!(g1_candidate3_pf_ids, vec![10, 9]); @@ -722,7 +559,7 @@ mod tests { let g2_candidate1 = &group2[0]; assert_eq!(g2_candidate1.budget_bytes(), 4500); - assert_eq!(g2_candidate1.partition.id(), partition6.id); + assert_eq!(g2_candidate1.partition.id(), partition6.partition.id); let g2_candidate1_pf_ids: Vec<_> = g2_candidate1.files.iter().map(|pf| pf.id.get()).collect(); assert_eq!(g2_candidate1_pf_ids, vec![12, 11]); @@ -733,7 +570,7 @@ mod tests { let g3_candidate1 = &group3[0]; assert_eq!(g3_candidate1.budget_bytes(), 11250); - assert_eq!(g3_candidate1.partition.id(), partition3.id); + assert_eq!(g3_candidate1.partition.id(), partition3.partition.id); let g3_candidate1_pf_ids: Vec<_> = g3_candidate1.files.iter().map(|pf| pf.id.get()).collect(); assert_eq!(g3_candidate1_pf_ids, vec![6, 5]); @@ -763,4 +600,52 @@ mod tests { memory_budget_bytes, ) } + + struct TestSetup { + compactor: Arc, + shard: Arc, + table: Arc, + } + + async fn test_setup() -> TestSetup { + let catalog = TestCatalog::new(); + let namespace = catalog + .create_namespace("namespace_hot_partitions_to_compact") + .await; + let shard = namespace.create_shard(1).await; + + // Create a scenario of a table of 5 columns: tag, time, field int, field string, field + // bool. Thus, each file will have estimated memory bytes = 1125 * row_count (for even + // row_counts). + let table = namespace.create_table("test_table").await; + + table.create_column("tag", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; + table.create_column("field_int", ColumnType::I64).await; + table + .create_column("field_string", ColumnType::String) + .await; + table.create_column("field_bool", ColumnType::Bool).await; + + // Create a compactor + // Compactor budget : 13,500 + let time_provider = Arc::new(SystemProvider::new()); + let config = make_compactor_config(); + let compactor = Arc::new(Compactor::new( + vec![shard.shard.id], + Arc::clone(&catalog.catalog), + ParquetStorage::new(Arc::clone(&catalog.object_store)), + Arc::new(Executor::new(1)), + time_provider, + BackoffConfig::default(), + config, + Arc::new(metric::Registry::new()), + )); + + TestSetup { + compactor, + shard, + table, + } + } } From 62b8819d49111bc5037bc9c54dfe793305fd2c68 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 31 Aug 2022 14:58:46 -0400 Subject: [PATCH 4/4] fix: Carry object store ID through test builder, but pick new every time --- iox_tests/src/util.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 075996e917..bfbaed1b53 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -494,7 +494,6 @@ impl TestPartition { let record_batch = record_batch.expect("A record batch is required"); let table = table.expect("A table is required"); let schema = schema.expect("A schema is required"); - assert_eq!( table, self.table.table.name, "Table name of line protocol and partition should have matched", @@ -509,6 +508,8 @@ impl TestPartition { let (record_batch, sort_key) = sort_batch(record_batch, schema.clone()); let record_batch = dedup_batch(record_batch, &sort_key); + let object_store_id = object_store_id.unwrap_or_else(Uuid::new_v4); + let metadata = IoxMetadata { object_store_id, creation_timestamp: now(), @@ -541,7 +542,7 @@ impl TestPartition { creation_time, compaction_level, to_delete, - object_store_id, + object_store_id: Some(object_store_id), row_count: None, // will be computed from the record batch again }; @@ -599,7 +600,7 @@ impl TestPartition { namespace_id: self.namespace.namespace.id, table_id: self.table.table.id, partition_id: self.partition.id, - object_store_id, + object_store_id: object_store_id.unwrap_or_else(Uuid::new_v4), max_sequence_number, min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), @@ -649,7 +650,7 @@ pub struct TestParquetFileBuilder { creation_time: i64, compaction_level: CompactionLevel, to_delete: bool, - object_store_id: Uuid, + object_store_id: Option, row_count: Option, } @@ -666,7 +667,7 @@ impl Default for TestParquetFileBuilder { creation_time: 1, compaction_level: CompactionLevel::Initial, to_delete: false, - object_store_id: Uuid::new_v4(), + object_store_id: None, row_count: None, } }