fix: Select by num of both l0 and l1 files for cold compaction

Now that we're going to compact level 1 files in to level 2 files as
well.
pull/24376/head
Carol (Nichols || Goulding) 2022-08-15 15:59:26 -04:00
parent 6bba3fafaa
commit da201ba87f
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 119 additions and 42 deletions

View File

@ -66,11 +66,11 @@ pub enum Error {
},
#[snafu(display(
"Error getting the most level 0 file partitions for shard {}. {}",
"Error getting the most level 0 + level 1 file cold partitions for shard {}. {}",
shard_id,
source
))]
MostL0Partitions {
MostColdPartitions {
source: iox_catalog::interface::Error,
shard_id: ShardId,
},
@ -324,9 +324,10 @@ impl Compactor {
/// Return a list of partitions that:
///
/// - Have not received any writes in 8 hours (determined by all parquet files having a
/// created_at time older than 8 hours ago)
/// - Have some level 0 parquet files that need to be upgraded or compacted
/// - Have not received any writes in 8 hours (determined by all level 0 and level 1 parquet
/// files having a created_at time older than 8 hours ago)
/// - Have some level 0 or level 1 parquet files that need to be upgraded or compacted
/// - Sorted by the number of level 0 files + number of level 1 files descending
pub async fn cold_partitions_to_compact(
&self,
// Max number of cold partitions per shard we want to compact
@ -347,13 +348,13 @@ impl Compactor {
let mut partitions = repos
.parquet_files()
.most_level_0_files_partitions(
.most_cold_files_partitions(
*shard_id,
time_8_hours_ago,
max_num_partitions_per_shard,
)
.await
.context(MostL0PartitionsSnafu {
.context(MostColdPartitionsSnafu {
shard_id: *shard_id,
})?;
@ -953,17 +954,14 @@ mod tests {
let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap();
txn.parquet_files().flag_for_delete(pf1.id).await.unwrap();
//
// partition2 has a cold non-L0 file
// partition2 has a cold L2 file
let p2 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition2.id,
compaction_level: CompactionLevel::Final,
..p1.clone()
};
let pf2 = txn.parquet_files().create(p2).await.unwrap();
txn.parquet_files()
.update_compaction_level(&[pf2.id], CompactionLevel::FileNonOverlapped)
.await
.unwrap();
let _pf2 = txn.parquet_files().create(p2).await.unwrap();
txn.commit().await.unwrap();
// No non-deleted level 0 files yet --> no candidates
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();

View File

@ -592,9 +592,10 @@ pub trait ParquetFileRepo: Send + Sync {
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// List partitions with the most level 0 files created earlier than `older_than_num_hours`
/// hours ago for a given shard. In other words, "cold" partitions that need compaction.
async fn most_level_0_files_partitions(
/// List partitions with the most level 0 + level 1 files created earlier than
/// `older_than_num_hours` hours ago for a given shard. In other words, "cold" partitions
/// that need compaction.
async fn most_cold_files_partitions(
&mut self,
shard_id: ShardId,
time_in_the_past: Timestamp,
@ -900,7 +901,7 @@ pub(crate) mod test_helpers {
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_most_level_0_files_partitions(Arc::clone(&catalog)).await;
test_most_cold_files_partitions(Arc::clone(&catalog)).await;
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
test_processed_tombstones(Arc::clone(&catalog)).await;
@ -2749,12 +2750,12 @@ pub(crate) mod test_helpers {
);
}
async fn test_most_level_0_files_partitions(catalog: Arc<dyn Catalog>) {
async fn test_most_cold_files_partitions(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("most_level_0").await.unwrap();
let topic = repos.topics().create_or_get("most_cold").await.unwrap();
let pool = repos
.query_pools()
.create_or_get("most_level_0")
.create_or_get("most_cold")
.await
.unwrap();
let namespace = repos
@ -2774,7 +2775,7 @@ pub(crate) mod test_helpers {
.unwrap();
let shard = repos
.shards()
.create_or_get(&topic, ShardIndex::new(100))
.create_or_get(&topic, ShardIndex::new(88))
.await
.unwrap();
@ -2793,10 +2794,14 @@ pub(crate) mod test_helpers {
// Db has no partition
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
assert!(
partitions.is_empty(),
"Expected no partitions, instead got {:#?}",
partitions,
);
// The DB has 1 partition but it does not have any files
let partition = repos
@ -2806,10 +2811,14 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
assert!(
partitions.is_empty(),
"Expected no partitions, instead got {:#?}",
partitions,
);
// The partition has one deleted file
let parquet_file_params = ParquetFileParams {
@ -2839,10 +2848,14 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
assert!(
partitions.is_empty(),
"Expected no partitions, instead got {:#?}",
partitions,
);
// A partition with one cold file and one hot file
let hot_partition = repos
@ -2869,10 +2882,42 @@ pub(crate) mod test_helpers {
repos.parquet_files().create(hot_file_params).await.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
assert!(
partitions.is_empty(),
"Expected no partitions, instead got {:#?}",
partitions,
);
// A partition that has only one non-deleted level 2 file, should never be returned
let already_compacted_partition = repos
.partitions()
.create_or_get("already_compacted".into(), shard.id, table.id)
.await
.unwrap();
let l2_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: already_compacted_partition.id,
compaction_level: CompactionLevel::Final,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(l2_file_params.clone())
.await
.unwrap();
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert!(
partitions.is_empty(),
"Expected no partitions, instead got {:#?}",
partitions,
);
// The partition has one non-deleted level 0 file
let l0_file_params = ParquetFileParams {
@ -2886,12 +2931,12 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
// The DB has 2 partitions; both have non-deleted L0 files
// The DB has 3 partitions; 2 have non-deleted L0 files
let another_partition = repos
.partitions()
.create_or_get("two".into(), shard.id, table.id)
@ -2920,15 +2965,15 @@ pub(crate) mod test_helpers {
// Must return 2 partitions
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// They must be in order another_partition (more L0 files), partition
// They must be in order another_partition (more files), partition
assert_eq!(partitions[0].partition_id, another_partition.id); // 2 files
assert_eq!(partitions[1].partition_id, partition.id); // 1 file
// The DB has 3 partitions with non-deleted L0 files
// The DB now has 3 partitions with non-deleted L0 files
let third_partition = repos
.partitions()
.create_or_get("three".into(), shard.id, table.id)
@ -2947,11 +2992,11 @@ pub(crate) mod test_helpers {
// Still return 2 partitions the limit num_partitions=2
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// and the first one should still be the one with the most L0 files
// and the first one should still be the one with the most files
assert_eq!(partitions[0].partition_id, another_partition.id);
// The compactor skipped compacting another_partition
@ -2964,7 +3009,7 @@ pub(crate) mod test_helpers {
// another_partition should no longer be selected for compaction
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
@ -2975,6 +3020,37 @@ pub(crate) mod test_helpers {
"Expected partitions not to include {}: {partitions:?}",
another_partition.id
);
// The DB now has 4 partitions, one of which has 3 non-deleted L1 files
let fourth_partition = repos
.partitions()
.create_or_get("four".into(), shard.id, table.id)
.await
.unwrap();
for _ in 0..3 {
let file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: fourth_partition.id,
compaction_level: CompactionLevel::FileNonOverlapped,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(file_params.clone())
.await
.unwrap();
}
// Still return 2 partitions with the limit num_partitions=2
let partitions = repos
.parquet_files()
.most_cold_files_partitions(shard.id, time_8_hours_ago, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// the first one should now be the one with the most files, which happen to be L1
assert_eq!(partitions[0].partition_id, fourth_partition.id);
// the second one should be the one with 2 files
assert_eq!(partitions[1].partition_id, another_partition.id);
}
async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) {

View File

@ -1232,7 +1232,7 @@ impl ParquetFileRepo for MemTxn {
Ok(partitions)
}
async fn most_level_0_files_partitions(
async fn most_cold_files_partitions(
&mut self,
shard_id: ShardId,
time_in_the_past: Timestamp,
@ -1244,7 +1244,8 @@ impl ParquetFileRepo for MemTxn {
.iter()
.filter(|f| {
f.shard_id == shard_id
&& f.compaction_level == CompactionLevel::Initial
&& (f.compaction_level == CompactionLevel::Initial
|| f.compaction_level == CompactionLevel::FileNonOverlapped)
&& f.to_delete.is_none()
})
.collect::<Vec<_>>();

View File

@ -284,7 +284,7 @@ decorate!(
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: ShardId, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, shard_id: ShardId, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: ShardId, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
]
);

View File

@ -1776,7 +1776,7 @@ LIMIT $4;
.map_err(|e| Error::SqlxError { source: e })
}
async fn most_level_0_files_partitions(
async fn most_cold_files_partitions(
&mut self,
shard_id: ShardId,
time_in_the_past: Timestamp,
@ -1792,7 +1792,7 @@ SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_
parquet_file.table_id, count(parquet_file.id), max(parquet_file.created_at)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = 0
WHERE (compaction_level = $4 OR compaction_level = $5)
AND to_delete IS NULL
AND shard_id = $1
AND skipped_compactions.partition_id IS NULL
@ -1805,6 +1805,8 @@ LIMIT $3;
.bind(&shard_id) // $1
.bind(time_in_the_past) // $2
.bind(&num_partitions) // $3
.bind(CompactionLevel::Initial) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })