feat: always pick cold partitions in next cycle even if it has been pa… (#5772)

* fix: always pick cold partitions in next cycle even if it has been partially compacted recently

* fix: comment

* fix: test output

* refactor: using var instead of literal

* fix: consider deleted L0s for recent writes

* chore: cleanup

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-09-30 11:54:00 -04:00 committed by GitHub
parent d7677c1b1d
commit d171697fd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 224 additions and 54 deletions

View File

@ -357,7 +357,7 @@ async fn compact_and_get_remote_partition() {
// "maxTime": "123456", // "maxTime": "123456",
// "fileSizeBytes": "2029", // "fileSizeBytes": "2029",
// "rowCount": "1", // "rowCount": "1",
// "compactionLevel": "1", // "compactionLevel": "2",
// "createdAt": "1650019674289347000" // "createdAt": "1650019674289347000"
// } // }
@ -375,7 +375,7 @@ async fn compact_and_get_remote_partition() {
predicate::str::contains(r#""id": "1""#) predicate::str::contains(r#""id": "1""#)
.and(predicate::str::contains(r#""shardId": "1","#)) .and(predicate::str::contains(r#""shardId": "1","#))
.and(predicate::str::contains(r#""partitionId": "1","#)) .and(predicate::str::contains(r#""partitionId": "1","#))
.and(predicate::str::contains(r#""compactionLevel": 1"#)), .and(predicate::str::contains(r#""compactionLevel": 2"#)),
) )
.get_output() .get_output()
.stdout .stdout

View File

@ -2867,8 +2867,8 @@ pub(crate) mod test_helpers {
partitions, partitions,
); );
// The DB has 1 partition but it does not have any files // The DB has 1 partition, partition_1, but it does not have any files
let partition = repos let partition_1 = repos
.partitions() .partitions()
.create_or_get("one".into(), shard.id, table.id) .create_or_get("one".into(), shard.id, table.id)
.await .await
@ -2884,12 +2884,12 @@ pub(crate) mod test_helpers {
partitions, partitions,
); );
// The partition has one deleted file // The partition_1 has one deleted file
let parquet_file_params = ParquetFileParams { let parquet_file_params = ParquetFileParams {
shard_id: shard.id, shard_id: shard.id,
namespace_id: namespace.id, namespace_id: namespace.id,
table_id: partition.table_id, table_id: partition_1.table_id,
partition_id: partition.id, partition_id: partition_1.id,
object_store_id: Uuid::new_v4(), object_store_id: Uuid::new_v4(),
max_sequence_number: SequenceNumber::new(140), max_sequence_number: SequenceNumber::new(140),
min_time: Timestamp::new(1), min_time: Timestamp::new(1),
@ -2921,7 +2921,7 @@ pub(crate) mod test_helpers {
partitions, partitions,
); );
// A partition with one cold file and one hot file // A hot_partition with one cold file and one hot file
let hot_partition = repos let hot_partition = repos
.partitions() .partitions()
.create_or_get("hot".into(), shard.id, table.id) .create_or_get("hot".into(), shard.id, table.id)
@ -2955,7 +2955,7 @@ pub(crate) mod test_helpers {
partitions, partitions,
); );
// A partition that has only one non-deleted level 2 file, should never be returned // A already_compacted_partition that has only one non-deleted level 2 file, should never be returned
let already_compacted_partition = repos let already_compacted_partition = repos
.partitions() .partitions()
.create_or_get("already_compacted".into(), shard.id, table.id) .create_or_get("already_compacted".into(), shard.id, table.id)
@ -2983,7 +2983,7 @@ pub(crate) mod test_helpers {
partitions, partitions,
); );
// The partition has one non-deleted level 0 file // The partition_1 has one non-deleted level 0 file created 38 hours ago
let l0_file_params = ParquetFileParams { let l0_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(), object_store_id: Uuid::new_v4(),
..parquet_file_params.clone() ..parquet_file_params.clone()
@ -3000,18 +3000,17 @@ pub(crate) mod test_helpers {
.unwrap(); .unwrap();
assert_eq!(partitions.len(), 1); assert_eq!(partitions.len(), 1);
// The DB has 3 partitions; 2 have non-deleted L0 files // Partition_2 has 2 non-deleted L0 file created 38 hours ago
let another_partition = repos let partition_2 = repos
.partitions() .partitions()
.create_or_get("two".into(), shard.id, table.id) .create_or_get("two".into(), shard.id, table.id)
.await .await
.unwrap(); .unwrap();
let another_file_params = ParquetFileParams { let another_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(), object_store_id: Uuid::new_v4(),
partition_id: another_partition.id, partition_id: partition_2.id,
..parquet_file_params.clone() ..parquet_file_params.clone()
}; };
// The new partition has 2 non-deleted L0 files
repos repos
.parquet_files() .parquet_files()
.create(another_file_params.clone()) .create(another_file_params.clone())
@ -3033,19 +3032,23 @@ pub(crate) mod test_helpers {
.await .await
.unwrap(); .unwrap();
assert_eq!(partitions.len(), 2); assert_eq!(partitions.len(), 2);
// They must be in order another_partition (more files), partition // They must be in order partition_2 (more files), partition
assert_eq!(partitions[0].partition_id, another_partition.id); // 2 files assert_eq!(partitions[0].partition_id, partition_2.id); // 2 files
assert_eq!(partitions[1].partition_id, partition.id); // 1 file assert_eq!(partitions[1].partition_id, partition_1.id); // 1 file
// The DB now has 3 partitions with non-deleted L0 files // Make partition_3 that has one level-1 file, no level-0
let third_partition = repos // The DB now has 3 cold partitions, two with non-deleted L0 files and one with only non-deleted L1
let partition_3 = repos
.partitions() .partitions()
.create_or_get("three".into(), shard.id, table.id) .create_or_get("three".into(), shard.id, table.id)
.await .await
.unwrap(); .unwrap();
// recent L1 but since no L0, this partition is still cold
let file_params = ParquetFileParams { let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(), object_store_id: Uuid::new_v4(),
partition_id: third_partition.id, partition_id: partition_3.id,
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: time_five_hour_ago,
..parquet_file_params.clone() ..parquet_file_params.clone()
}; };
repos repos
@ -3053,31 +3056,35 @@ pub(crate) mod test_helpers {
.create(file_params.clone()) .create(file_params.clone())
.await .await
.unwrap(); .unwrap();
// Still return 2 partitions the limit num_partitions=2
// Still return 2 partitions because the limit num_partitions is 2
let partitions = repos let partitions = repos
.parquet_files() .parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions) .most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await .await
.unwrap(); .unwrap();
assert_eq!(partitions.len(), 2); assert_eq!(partitions.len(), 2);
// and the first one should still be the one, partition_2, with the most files
assert_eq!(partitions[0].partition_id, partition_2.id);
//
// return 3 partitions becasue the limit num_partitions is now 5
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, 5)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// and the first one should still be the one with the most files // and the first one should still be the one with the most files
assert_eq!(partitions[0].partition_id, another_partition.id); assert_eq!(partitions[0].partition_id, partition_2.id);
// The compactor skipped compacting another_partition // The compactor skipped compacting partition_2
repos repos
.partitions() .partitions()
.record_skipped_compaction( .record_skipped_compaction(partition_2.id, "Not feeling up to it today", 1, 2, 10, 20)
another_partition.id,
"Not feeling up to it today",
1,
2,
10,
20,
)
.await .await
.unwrap(); .unwrap();
// another_partition should no longer be selected for compaction // partition_2 should no longer be selected for compaction
let partitions = repos let partitions = repos
.parquet_files() .parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions) .most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
@ -3085,16 +3092,29 @@ pub(crate) mod test_helpers {
.unwrap(); .unwrap();
assert_eq!(partitions.len(), 2); assert_eq!(partitions.len(), 2);
assert!( assert!(
partitions partitions.iter().all(|p| p.partition_id != partition_2.id),
.iter()
.all(|p| p.partition_id != another_partition.id),
"Expected partitions not to include {}: {partitions:?}", "Expected partitions not to include {}: {partitions:?}",
another_partition.id partition_2.id
); );
// The DB now has 4 partitions, one of which has 3 non-deleted L1 files, another_partition // Add another L1 files into partition_3 to make it have 2 L1 files for easier to check the output
// should still be skipped // A non-recent L1
let fourth_partition = repos let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition_3.id,
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: time_38_hour_ago,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
// Create partition_4 with 3 non-deleted L1 files
// The DB now has 4 cold partitions but partition_2 should still be skipped
let partition_4 = repos
.partitions() .partitions()
.create_or_get("four".into(), shard.id, table.id) .create_or_get("four".into(), shard.id, table.id)
.await .await
@ -3102,7 +3122,7 @@ pub(crate) mod test_helpers {
for _ in 0..3 { for _ in 0..3 {
let file_params = ParquetFileParams { let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(), object_store_id: Uuid::new_v4(),
partition_id: fourth_partition.id, partition_id: partition_4.id,
compaction_level: CompactionLevel::FileNonOverlapped, compaction_level: CompactionLevel::FileNonOverlapped,
..parquet_file_params.clone() ..parquet_file_params.clone()
}; };
@ -3119,8 +3139,138 @@ pub(crate) mod test_helpers {
.await .await
.unwrap(); .unwrap();
assert_eq!(partitions.len(), 2); assert_eq!(partitions.len(), 2);
// the first one should now be the one with the most files, which happen to be L1 // the first one should now be the one with the most files: 3 L1s
assert_eq!(partitions[0].partition_id, fourth_partition.id); assert_eq!(partitions[0].partition_id, partition_4.id);
// second one should be partition_3 with 2 files: 2 L1s
assert_eq!(partitions[1].partition_id, partition_3.id);
// Return 3 partitions with the limit num_partitions=4
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, 4)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// the first one should now be the one with the most files: 3 L1s
assert_eq!(partitions[0].partition_id, partition_4.id);
// second one should be partition_3 with 2 files: 2 L1s
assert_eq!(partitions[1].partition_id, partition_3.id);
// third one should be partition_1 witth 1 file: 1 L0
assert_eq!(partitions[2].partition_id, partition_1.id);
// Partition_5 with a non-deleted L1 and a deleted L0 created recently
// The DB now still has 4 cold partitions but partition_2 should still be skipped
// partition_5 is hot because it has a recent L0 even though it is deleted
let partition_5 = repos
.partitions()
.create_or_get("five".into(), shard.id, table.id)
.await
.unwrap();
// L1 created recently
let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition_5.id,
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: time_five_hour_ago,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
// L0 created recently but deleted
let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition_5.id,
compaction_level: CompactionLevel::Initial,
created_at: time_five_hour_ago,
..parquet_file_params.clone()
};
let delete_l0_file = repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
repos
.parquet_files()
.flag_for_delete(delete_l0_file.id)
.await
.unwrap();
//
// Return 3 cold partitions, partition_1, partition_3, partition_4 becasue num_partitions=5
// still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 created recently
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, 5)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// the first one should now be the one with the most files 3 L1s
assert_eq!(partitions[0].partition_id, partition_4.id);
// second one should be partition_3 with 2 files: 2 L1s
assert_eq!(partitions[1].partition_id, partition_3.id);
// third one should be partition_1 witth 1 file: 1 L0
assert_eq!(partitions[2].partition_id, partition_1.id);
// Create partition_6 with 4 L1s and one deleted but non-recent L0
// The DB now has 5 cold partitions but partition_2 should still be skipped
let partition_6 = repos
.partitions()
.create_or_get("six".into(), shard.id, table.id)
.await
.unwrap();
for _ in 0..4 {
// L1 created recently
let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition_6.id,
compaction_level: CompactionLevel::FileNonOverlapped,
created_at: time_five_hour_ago,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
}
// old and deleted L0
let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition_6.id,
compaction_level: CompactionLevel::Initial,
created_at: time_38_hour_ago,
..parquet_file_params.clone()
};
let delete_l0_file = repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
repos
.parquet_files()
.flag_for_delete(delete_l0_file.id)
.await
.unwrap();
//
// Return 4 cold partitions, partition_1, partition_3, partition_4, partition_6 because num_partitions=5
// still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 created recently
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, 5)
.await
.unwrap();
assert_eq!(partitions.len(), 4);
// the first one should now be the one with the most files: 4 L1s
assert_eq!(partitions[0].partition_id, partition_6.id);
// then should be the one with the most files: 3 L1s
assert_eq!(partitions[1].partition_id, partition_4.id);
// then should be partition_3 with 2 files: 2 L1s
assert_eq!(partitions[2].partition_id, partition_3.id);
// then should be partition_1 witth 1 file: 1 L0
assert_eq!(partitions[3].partition_id, partition_1.id);
} }
async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) { async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) {

View File

@ -1298,7 +1298,6 @@ impl ParquetFileRepo for MemTxn {
f.shard_id == shard_id f.shard_id == shard_id
&& (f.compaction_level == CompactionLevel::Initial && (f.compaction_level == CompactionLevel::Initial
|| f.compaction_level == CompactionLevel::FileNonOverlapped) || f.compaction_level == CompactionLevel::FileNonOverlapped)
&& f.to_delete.is_none()
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -1313,11 +1312,29 @@ impl ParquetFileRepo for MemTxn {
namespace_id: pf.namespace_id, namespace_id: pf.namespace_id,
table_id: pf.table_id, table_id: pf.table_id,
}; };
let count = partition_duplicate_count.entry(key).or_insert(0);
*count += 1; if pf.to_delete.is_none() {
let max_created_at = partition_max_created_at.entry(key).or_insert(pf.created_at); let count = partition_duplicate_count.entry(key).or_insert(0);
if pf.created_at > *max_created_at { *count += 1;
*max_created_at = pf.created_at; }
let created_at = if pf.compaction_level == CompactionLevel::Initial {
// the file is level-0, use its created_at time even if it is deleted
Some(pf.created_at)
} else if pf.to_delete.is_none() {
// non deleted level-1, make it `time_in_the_past - 1` to have this partition always the cold one
Some(time_in_the_past - 1)
} else {
// This is the case of deleted level-1
None
};
if let Some(created_at) = created_at {
let max_created_at = partition_max_created_at.entry(key).or_insert(created_at);
*max_created_at = std::cmp::max(*max_created_at, created_at);
if created_at > *max_created_at {
*max_created_at = created_at;
}
} }
} }

View File

@ -1873,21 +1873,24 @@ LIMIT $4;
) -> Result<Vec<PartitionParam>> { ) -> Result<Vec<PartitionParam>> {
let num_partitions = num_partitions as i32; let num_partitions = num_partitions as i32;
// The preliminary performance test says this query runs around 50ms // This query returns partitions with most L0+L1 files and all L0 files (both deleted and non deleted) are either created
// We have index on (shard_id, comapction_level, to_delete) // before the given time ($2) or not available (removed by garbage collector)
sqlx::query_as::<_, PartitionParam>( sqlx::query_as::<_, PartitionParam>(
r#" r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id, count(parquet_file.id), max(parquet_file.created_at) parquet_file.table_id,
count(case when to_delete is null then 1 end) total_count,
max(case when compaction_level= $4 then parquet_file.created_at end)
FROM parquet_file FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE (compaction_level = $4 OR compaction_level = $5) WHERE (compaction_level = $4 OR compaction_level = $5)
AND to_delete IS NULL
AND shard_id = $1 AND shard_id = $1
AND skipped_compactions.partition_id IS NULL AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4 GROUP BY 1, 2, 3, 4
HAVING max(created_at) < $2 HAVING count(case when to_delete is null then 1 end) > 0
ORDER BY 5 DESC AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR
max(case when compaction_level= $4 then parquet_file.created_at end) is null)
ORDER BY total_count DESC
LIMIT $3; LIMIT $3;
"#, "#,
) )