From d171697fd78298877fe5f6db01e05ac0cd34090c Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 30 Sep 2022 11:54:00 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20always=20pick=20cold=20partitions=20in?= =?UTF-8?q?=20next=20cycle=20even=20if=20it=20has=20been=20pa=E2=80=A6=20(?= =?UTF-8?q?#5772)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: always pick cold partitions in next cycle even if it has been partially compacted recently * fix: comment * fix: test output * refactor: using var instead of literal * fix: consider deleted L0s for recent writes * chore: cleanup Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- influxdb_iox/tests/end_to_end_cases/cli.rs | 4 +- iox_catalog/src/interface.rs | 230 +++++++++++++++++---- iox_catalog/src/mem.rs | 29 ++- iox_catalog/src/postgres.rs | 15 +- 4 files changed, 224 insertions(+), 54 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 7ed0af1ecd..89f868cae8 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -357,7 +357,7 @@ async fn compact_and_get_remote_partition() { // "maxTime": "123456", // "fileSizeBytes": "2029", // "rowCount": "1", - // "compactionLevel": "1", + // "compactionLevel": "2", // "createdAt": "1650019674289347000" // } @@ -375,7 +375,7 @@ async fn compact_and_get_remote_partition() { predicate::str::contains(r#""id": "1""#) .and(predicate::str::contains(r#""shardId": "1","#)) .and(predicate::str::contains(r#""partitionId": "1","#)) - .and(predicate::str::contains(r#""compactionLevel": 1"#)), + .and(predicate::str::contains(r#""compactionLevel": 2"#)), ) .get_output() .stdout diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 8c9853466e..431c22cdb7 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -2867,8 +2867,8 @@ pub(crate) mod test_helpers { partitions, ); - // The DB has 1 partition but it does not have any files - let partition = repos + // The DB has 1 partition, partition_1, but it does not have any files + let partition_1 = repos .partitions() .create_or_get("one".into(), shard.id, table.id) .await @@ -2884,12 +2884,12 @@ pub(crate) mod test_helpers { partitions, ); - // The partition has one deleted file + // The partition_1 has one deleted file let parquet_file_params = ParquetFileParams { shard_id: shard.id, namespace_id: namespace.id, - table_id: partition.table_id, - partition_id: partition.id, + table_id: partition_1.table_id, + partition_id: partition_1.id, object_store_id: Uuid::new_v4(), max_sequence_number: SequenceNumber::new(140), min_time: Timestamp::new(1), @@ -2921,7 +2921,7 @@ pub(crate) mod test_helpers { partitions, ); - // A partition with one cold file and one hot file + // A hot_partition with one cold file and one hot file let hot_partition = repos .partitions() .create_or_get("hot".into(), shard.id, table.id) @@ -2955,7 +2955,7 @@ pub(crate) mod test_helpers { partitions, ); - // A partition that has only one non-deleted level 2 file, should never be returned + // A already_compacted_partition that has only one non-deleted level 2 file, should never be returned let already_compacted_partition = repos .partitions() .create_or_get("already_compacted".into(), shard.id, table.id) @@ -2983,7 +2983,7 @@ pub(crate) mod test_helpers { partitions, ); - // The partition has one non-deleted level 0 file + // The partition_1 has one non-deleted level 0 file created 38 hours ago let l0_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), ..parquet_file_params.clone() @@ -3000,18 +3000,17 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(partitions.len(), 1); - // The DB has 3 partitions; 2 have non-deleted L0 files - let another_partition = repos + // Partition_2 has 2 non-deleted L0 file created 38 hours ago + let partition_2 = repos .partitions() .create_or_get("two".into(), shard.id, table.id) .await .unwrap(); let another_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), - partition_id: another_partition.id, + partition_id: partition_2.id, ..parquet_file_params.clone() }; - // The new partition has 2 non-deleted L0 files repos .parquet_files() .create(another_file_params.clone()) @@ -3033,19 +3032,23 @@ pub(crate) mod test_helpers { .await .unwrap(); assert_eq!(partitions.len(), 2); - // They must be in order another_partition (more files), partition - assert_eq!(partitions[0].partition_id, another_partition.id); // 2 files - assert_eq!(partitions[1].partition_id, partition.id); // 1 file + // They must be in order partition_2 (more files), partition + assert_eq!(partitions[0].partition_id, partition_2.id); // 2 files + assert_eq!(partitions[1].partition_id, partition_1.id); // 1 file - // The DB now has 3 partitions with non-deleted L0 files - let third_partition = repos + // Make partition_3 that has one level-1 file, no level-0 + // The DB now has 3 cold partitions, two with non-deleted L0 files and one with only non-deleted L1 + let partition_3 = repos .partitions() .create_or_get("three".into(), shard.id, table.id) .await .unwrap(); + // recent L1 but since no L0, this partition is still cold let file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), - partition_id: third_partition.id, + partition_id: partition_3.id, + compaction_level: CompactionLevel::FileNonOverlapped, + created_at: time_five_hour_ago, ..parquet_file_params.clone() }; repos @@ -3053,31 +3056,35 @@ pub(crate) mod test_helpers { .create(file_params.clone()) .await .unwrap(); - // Still return 2 partitions the limit num_partitions=2 + + // Still return 2 partitions because the limit num_partitions is 2 let partitions = repos .parquet_files() .most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions) .await .unwrap(); assert_eq!(partitions.len(), 2); + // and the first one should still be the one, partition_2, with the most files + assert_eq!(partitions[0].partition_id, partition_2.id); + // + // return 3 partitions becasue the limit num_partitions is now 5 + let partitions = repos + .parquet_files() + .most_cold_files_partitions(shard.id, time_8_hours_ago, 5) + .await + .unwrap(); + assert_eq!(partitions.len(), 3); // and the first one should still be the one with the most files - assert_eq!(partitions[0].partition_id, another_partition.id); + assert_eq!(partitions[0].partition_id, partition_2.id); - // The compactor skipped compacting another_partition + // The compactor skipped compacting partition_2 repos .partitions() - .record_skipped_compaction( - another_partition.id, - "Not feeling up to it today", - 1, - 2, - 10, - 20, - ) + .record_skipped_compaction(partition_2.id, "Not feeling up to it today", 1, 2, 10, 20) .await .unwrap(); - // another_partition should no longer be selected for compaction + // partition_2 should no longer be selected for compaction let partitions = repos .parquet_files() .most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions) @@ -3085,16 +3092,29 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(partitions.len(), 2); assert!( - partitions - .iter() - .all(|p| p.partition_id != another_partition.id), + partitions.iter().all(|p| p.partition_id != partition_2.id), "Expected partitions not to include {}: {partitions:?}", - another_partition.id + partition_2.id ); - // The DB now has 4 partitions, one of which has 3 non-deleted L1 files, another_partition - // should still be skipped - let fourth_partition = repos + // Add another L1 files into partition_3 to make it have 2 L1 files for easier to check the output + // A non-recent L1 + let file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition_3.id, + compaction_level: CompactionLevel::FileNonOverlapped, + created_at: time_38_hour_ago, + ..parquet_file_params.clone() + }; + repos + .parquet_files() + .create(file_params.clone()) + .await + .unwrap(); + + // Create partition_4 with 3 non-deleted L1 files + // The DB now has 4 cold partitions but partition_2 should still be skipped + let partition_4 = repos .partitions() .create_or_get("four".into(), shard.id, table.id) .await @@ -3102,7 +3122,7 @@ pub(crate) mod test_helpers { for _ in 0..3 { let file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), - partition_id: fourth_partition.id, + partition_id: partition_4.id, compaction_level: CompactionLevel::FileNonOverlapped, ..parquet_file_params.clone() }; @@ -3119,8 +3139,138 @@ pub(crate) mod test_helpers { .await .unwrap(); assert_eq!(partitions.len(), 2); - // the first one should now be the one with the most files, which happen to be L1 - assert_eq!(partitions[0].partition_id, fourth_partition.id); + // the first one should now be the one with the most files: 3 L1s + assert_eq!(partitions[0].partition_id, partition_4.id); + // second one should be partition_3 with 2 files: 2 L1s + assert_eq!(partitions[1].partition_id, partition_3.id); + + // Return 3 partitions with the limit num_partitions=4 + let partitions = repos + .parquet_files() + .most_cold_files_partitions(shard.id, time_8_hours_ago, 4) + .await + .unwrap(); + assert_eq!(partitions.len(), 3); + // the first one should now be the one with the most files: 3 L1s + assert_eq!(partitions[0].partition_id, partition_4.id); + // second one should be partition_3 with 2 files: 2 L1s + assert_eq!(partitions[1].partition_id, partition_3.id); + // third one should be partition_1 witth 1 file: 1 L0 + assert_eq!(partitions[2].partition_id, partition_1.id); + + // Partition_5 with a non-deleted L1 and a deleted L0 created recently + // The DB now still has 4 cold partitions but partition_2 should still be skipped + // partition_5 is hot because it has a recent L0 even though it is deleted + let partition_5 = repos + .partitions() + .create_or_get("five".into(), shard.id, table.id) + .await + .unwrap(); + // L1 created recently + let file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition_5.id, + compaction_level: CompactionLevel::FileNonOverlapped, + created_at: time_five_hour_ago, + ..parquet_file_params.clone() + }; + repos + .parquet_files() + .create(file_params.clone()) + .await + .unwrap(); + // L0 created recently but deleted + let file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition_5.id, + compaction_level: CompactionLevel::Initial, + created_at: time_five_hour_ago, + ..parquet_file_params.clone() + }; + let delete_l0_file = repos + .parquet_files() + .create(file_params.clone()) + .await + .unwrap(); + repos + .parquet_files() + .flag_for_delete(delete_l0_file.id) + .await + .unwrap(); + // + // Return 3 cold partitions, partition_1, partition_3, partition_4 becasue num_partitions=5 + // still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 created recently + let partitions = repos + .parquet_files() + .most_cold_files_partitions(shard.id, time_8_hours_ago, 5) + .await + .unwrap(); + assert_eq!(partitions.len(), 3); + // the first one should now be the one with the most files 3 L1s + assert_eq!(partitions[0].partition_id, partition_4.id); + // second one should be partition_3 with 2 files: 2 L1s + assert_eq!(partitions[1].partition_id, partition_3.id); + // third one should be partition_1 witth 1 file: 1 L0 + assert_eq!(partitions[2].partition_id, partition_1.id); + + // Create partition_6 with 4 L1s and one deleted but non-recent L0 + // The DB now has 5 cold partitions but partition_2 should still be skipped + let partition_6 = repos + .partitions() + .create_or_get("six".into(), shard.id, table.id) + .await + .unwrap(); + for _ in 0..4 { + // L1 created recently + let file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition_6.id, + compaction_level: CompactionLevel::FileNonOverlapped, + created_at: time_five_hour_ago, + ..parquet_file_params.clone() + }; + repos + .parquet_files() + .create(file_params.clone()) + .await + .unwrap(); + } + // old and deleted L0 + let file_params = ParquetFileParams { + object_store_id: Uuid::new_v4(), + partition_id: partition_6.id, + compaction_level: CompactionLevel::Initial, + created_at: time_38_hour_ago, + ..parquet_file_params.clone() + }; + let delete_l0_file = repos + .parquet_files() + .create(file_params.clone()) + .await + .unwrap(); + repos + .parquet_files() + .flag_for_delete(delete_l0_file.id) + .await + .unwrap(); + + // + // Return 4 cold partitions, partition_1, partition_3, partition_4, partition_6 because num_partitions=5 + // still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 created recently + let partitions = repos + .parquet_files() + .most_cold_files_partitions(shard.id, time_8_hours_ago, 5) + .await + .unwrap(); + assert_eq!(partitions.len(), 4); + // the first one should now be the one with the most files: 4 L1s + assert_eq!(partitions[0].partition_id, partition_6.id); + // then should be the one with the most files: 3 L1s + assert_eq!(partitions[1].partition_id, partition_4.id); + // then should be partition_3 with 2 files: 2 L1s + assert_eq!(partitions[2].partition_id, partition_3.id); + // then should be partition_1 witth 1 file: 1 L0 + assert_eq!(partitions[3].partition_id, partition_1.id); } async fn test_recent_highest_throughput_partitions(catalog: Arc) { diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 7c4856a803..24260bacda 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1298,7 +1298,6 @@ impl ParquetFileRepo for MemTxn { f.shard_id == shard_id && (f.compaction_level == CompactionLevel::Initial || f.compaction_level == CompactionLevel::FileNonOverlapped) - && f.to_delete.is_none() }) .collect::>(); @@ -1313,11 +1312,29 @@ impl ParquetFileRepo for MemTxn { namespace_id: pf.namespace_id, table_id: pf.table_id, }; - let count = partition_duplicate_count.entry(key).or_insert(0); - *count += 1; - let max_created_at = partition_max_created_at.entry(key).or_insert(pf.created_at); - if pf.created_at > *max_created_at { - *max_created_at = pf.created_at; + + if pf.to_delete.is_none() { + let count = partition_duplicate_count.entry(key).or_insert(0); + *count += 1; + } + + let created_at = if pf.compaction_level == CompactionLevel::Initial { + // the file is level-0, use its created_at time even if it is deleted + Some(pf.created_at) + } else if pf.to_delete.is_none() { + // non deleted level-1, make it `time_in_the_past - 1` to have this partition always the cold one + Some(time_in_the_past - 1) + } else { + // This is the case of deleted level-1 + None + }; + + if let Some(created_at) = created_at { + let max_created_at = partition_max_created_at.entry(key).or_insert(created_at); + *max_created_at = std::cmp::max(*max_created_at, created_at); + if created_at > *max_created_at { + *max_created_at = created_at; + } } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index e88e49f5db..7544e65370 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1873,21 +1873,24 @@ LIMIT $4; ) -> Result> { let num_partitions = num_partitions as i32; - // The preliminary performance test says this query runs around 50ms - // We have index on (shard_id, comapction_level, to_delete) + // This query returns partitions with most L0+L1 files and all L0 files (both deleted and non deleted) are either created + // before the given time ($2) or not available (removed by garbage collector) sqlx::query_as::<_, PartitionParam>( r#" SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, count(parquet_file.id), max(parquet_file.created_at) + parquet_file.table_id, + count(case when to_delete is null then 1 end) total_count, + max(case when compaction_level= $4 then parquet_file.created_at end) FROM parquet_file LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id WHERE (compaction_level = $4 OR compaction_level = $5) -AND to_delete IS NULL AND shard_id = $1 AND skipped_compactions.partition_id IS NULL GROUP BY 1, 2, 3, 4 -HAVING max(created_at) < $2 -ORDER BY 5 DESC +HAVING count(case when to_delete is null then 1 end) > 0 + AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR + max(case when compaction_level= $4 then parquet_file.created_at end) is null) +ORDER BY total_count DESC LIMIT $3; "#, )