perf: optimize not to update partitions with newly created level 2 files (#6590)
* perf: optimize not to update partitions with newly created level 2 files * chore: cleanup Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
677de283ea
commit
550cea8bc5
|
@ -918,7 +918,7 @@ pub mod tests {
|
|||
let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap();
|
||||
txn.parquet_files().flag_for_delete(pf1.id).await.unwrap();
|
||||
//
|
||||
// partition2 has a cold L2 file created recently (default 9 hours ago) --> a cold candidate
|
||||
// partition2 has only a L2 file created recently (default 9 hours ago) --> not a candidate
|
||||
let p2 = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
partition_id: partition2.id,
|
||||
|
@ -927,23 +927,13 @@ pub mod tests {
|
|||
};
|
||||
let _pf2 = txn.parquet_files().create(p2).await.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
// 1 candidate becasue we limit it
|
||||
let candidates = compactor
|
||||
.partitions_to_compact(compaction_type, vec![hour_threshold], 1)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(candidates.len(), 1);
|
||||
// 2 candidates because we do not limit it
|
||||
// only partition 1 is returned as a candidate
|
||||
let candidates = compactor
|
||||
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(candidates.len(), 2);
|
||||
// sort candidates on partition_id
|
||||
let mut candidates = candidates.into_iter().collect::<Vec<_>>();
|
||||
candidates.sort_by_key(|c| c.candidate.partition_id);
|
||||
assert_eq!(candidates.len(), 1);
|
||||
assert_eq!(candidates[0].candidate.partition_id, partition1.id);
|
||||
assert_eq!(candidates[1].candidate.partition_id, partition2.id);
|
||||
//
|
||||
// verify no candidates will actualy get cold compaction because they are not old enough
|
||||
let files = parquet_file_lookup::ParquetFilesForCompaction::for_partition(
|
||||
|
@ -954,15 +944,6 @@ pub mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
assert!(files.is_none());
|
||||
//
|
||||
let files = parquet_file_lookup::ParquetFilesForCompaction::for_partition(
|
||||
Arc::clone(&compactor),
|
||||
Arc::clone(&candidates[1]),
|
||||
CompactionType::Cold,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(files.is_none());
|
||||
|
||||
// --------------------------------------
|
||||
// Case 3: no new recent writes (within the last 8 hours) --> return that partition
|
||||
|
@ -977,7 +958,7 @@ pub mod tests {
|
|||
let _pf3 = txn.parquet_files().create(p3).await.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
//
|
||||
// Still return 2 candidates
|
||||
// Return 2 candidates
|
||||
let candidates = compactor
|
||||
.partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions)
|
||||
.await
|
||||
|
@ -1208,5 +1189,15 @@ pub mod tests {
|
|||
assert_eq!(candidates[3].candidate.partition_id, partition4.id);
|
||||
assert_eq!(candidates[4].candidate.partition_id, partition5.id);
|
||||
assert_eq!(candidates[5].candidate.partition_id, another_partition.id);
|
||||
|
||||
// --------------------------------------
|
||||
// Test limit of number of partitions to compact
|
||||
let candidates = compactor
|
||||
.partitions_to_compact(compaction_type, vec![hour_threshold], 3)
|
||||
.await
|
||||
.unwrap();
|
||||
// Since we do not prioritize partitions (for now), only need to check the number of candidates
|
||||
// We are testing and the goal is to keep this limit high and do not have to think about prioritization
|
||||
assert_eq!(candidates.len(), 3);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
-- FUNTION that updates the new_file_at field in the partition table when the update_partition trigger is fired
|
||||
-- The field new_file_at signals when the last file was added to the partition for compaction. However,
|
||||
-- since compaction level 2 is the final stage, its creation should not signal further compaction
|
||||
|
||||
CREATE OR REPLACE FUNCTION update_partition_on_new_file_at()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE PLPGSQL
|
||||
AS $$
|
||||
BEGIN
|
||||
IF NEW.compaction_level < 2 THEN
|
||||
UPDATE partition SET new_file_at = NEW.created_at WHERE id = NEW.partition_id;
|
||||
END IF;
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$;
|
|
@ -634,15 +634,6 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
max_time: Timestamp,
|
||||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
|
||||
/// Select partition for cold/warm/hot compaction
|
||||
/// These are partitions with files created recently (aka created after the specified time_in_the_past)
|
||||
/// These files include all levels of compaction files and both non-deleted and soft-deleted files
|
||||
async fn partitions_with_recent_created_files(
|
||||
&mut self,
|
||||
time_in_the_past: Timestamp,
|
||||
) -> Result<Vec<PartitionId>>;
|
||||
|
||||
// Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518
|
||||
/// List the most recent highest throughput partition for a given shard, if specified
|
||||
async fn recent_highest_throughput_partitions(
|
||||
|
@ -3837,12 +3828,6 @@ pub(crate) mod test_helpers {
|
|||
let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5));
|
||||
|
||||
// Db has no partition
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(partitions.is_empty());
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -3859,12 +3844,6 @@ pub(crate) mod test_helpers {
|
|||
.create_or_get("one".into(), shard.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(partitions.is_empty());
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -3902,13 +3881,6 @@ pub(crate) mod test_helpers {
|
|||
.flag_for_delete(delete_l0_file.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// still empty becasue the file was not recently created
|
||||
assert!(partitions.is_empty());
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -3928,14 +3900,7 @@ pub(crate) mod test_helpers {
|
|||
.create(l0_one_hour_ago_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// partition one should be returned
|
||||
assert_eq!(partitions.len(), 1);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -3953,14 +3918,7 @@ pub(crate) mod test_helpers {
|
|||
.create_or_get("two".into(), shard.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// should return partittion one only
|
||||
assert_eq!(partitions.len(), 1);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -3982,14 +3940,7 @@ pub(crate) mod test_helpers {
|
|||
.create(l0_five_hour_ago_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// still return partittione one only
|
||||
assert_eq!(partitions.len(), 1);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -4012,15 +3963,7 @@ pub(crate) mod test_helpers {
|
|||
.create(l1_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// should return both partitions
|
||||
assert_eq!(partitions.len(), 2);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
assert!(partitions.contains(&partition2.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -4042,15 +3985,7 @@ pub(crate) mod test_helpers {
|
|||
.create_or_get("three".into(), shard.id, table.id)
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// should return partittion one and two only
|
||||
assert_eq!(partitions.len(), 2);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
assert!(partitions.contains(&partition2.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
@ -4064,7 +3999,35 @@ pub(crate) mod test_helpers {
|
|||
assert_eq!(partitions[0].partition_id, partition1.id);
|
||||
assert_eq!(partitions[1].partition_id, partition2.id);
|
||||
|
||||
// add an L0 file created recently (one hour ago)
|
||||
// Add a L2 created recently (just now) for partition three
|
||||
// Since it is L2, the partition won't get updated
|
||||
let l2_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_now,
|
||||
partition_id: partition3.id,
|
||||
compaction_level: CompactionLevel::Final,
|
||||
..parquet_file_params.clone()
|
||||
};
|
||||
repos
|
||||
.parquet_files()
|
||||
.create(l2_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
// still should return partittion one and two only
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(partitions.len(), 2);
|
||||
// sort by partition id
|
||||
let mut partitions = partitions;
|
||||
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
|
||||
assert_eq!(partitions[0].partition_id, partition1.id);
|
||||
assert_eq!(partitions[1].partition_id, partition2.id);
|
||||
|
||||
// add an L0 file created recently (one hour ago) for partition three
|
||||
let l0_one_hour_ago_file_params = ParquetFileParams {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
created_at: time_one_hour_ago,
|
||||
|
@ -4076,16 +4039,7 @@ pub(crate) mod test_helpers {
|
|||
.create(l0_one_hour_ago_file_params.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.partitions_with_recent_created_files(time_two_hour_ago)
|
||||
.await
|
||||
.unwrap();
|
||||
// should return all partitions
|
||||
assert_eq!(partitions.len(), 3);
|
||||
assert!(partitions.contains(&partition1.id));
|
||||
assert!(partitions.contains(&partition2.id));
|
||||
assert!(partitions.contains(&partition3.id));
|
||||
// get from partition table
|
||||
let partitions = repos
|
||||
.partitions()
|
||||
|
|
|
@ -1195,15 +1195,19 @@ impl ParquetFileRepo for MemTxn {
|
|||
created_at,
|
||||
column_set,
|
||||
};
|
||||
let compaction_level = parquet_file.compaction_level;
|
||||
stage.parquet_files.push(parquet_file);
|
||||
|
||||
// Update the new_file_at field its partition to the time of created_at
|
||||
let partition = stage
|
||||
.partitions
|
||||
.iter_mut()
|
||||
.find(|p| p.id == partition_id)
|
||||
.ok_or(Error::PartitionNotFound { id: partition_id })?;
|
||||
partition.new_file_at = Some(created_at);
|
||||
// Only update if the compaction level is not Final which signal more compaction needed
|
||||
if compaction_level < CompactionLevel::Final {
|
||||
let partition = stage
|
||||
.partitions
|
||||
.iter_mut()
|
||||
.find(|p| p.id == partition_id)
|
||||
.ok_or(Error::PartitionNotFound { id: partition_id })?;
|
||||
partition.new_file_at = Some(created_at);
|
||||
}
|
||||
|
||||
Ok(stage.parquet_files.last().unwrap().clone())
|
||||
}
|
||||
|
@ -1359,20 +1363,6 @@ impl ParquetFileRepo for MemTxn {
|
|||
.cloned()
|
||||
.collect())
|
||||
}
|
||||
async fn partitions_with_recent_created_files(
|
||||
&mut self,
|
||||
time_in_the_past: Timestamp,
|
||||
) -> Result<Vec<PartitionId>> {
|
||||
let stage = self.stage();
|
||||
|
||||
let partitions: Vec<_> = stage
|
||||
.parquet_files
|
||||
.iter()
|
||||
.filter(|f| f.created_at > time_in_the_past)
|
||||
.map(|f| f.partition_id)
|
||||
.collect();
|
||||
Ok(partitions)
|
||||
}
|
||||
|
||||
async fn recent_highest_throughput_partitions(
|
||||
&mut self,
|
||||
|
|
|
@ -289,7 +289,6 @@ decorate!(
|
|||
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
|
||||
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
|
||||
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
|
||||
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp) -> Result<Vec<PartitionId>>;
|
||||
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||
"parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option<ShardId>, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
|
||||
|
|
|
@ -1897,23 +1897,6 @@ WHERE parquet_file.shard_id = $1
|
|||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn partitions_with_recent_created_files(
|
||||
&mut self,
|
||||
time_in_the_past: Timestamp,
|
||||
) -> Result<Vec<PartitionId>> {
|
||||
sqlx::query_as::<_, PartitionId>(
|
||||
r#"
|
||||
SELECT distinct partition_id
|
||||
FROM parquet_file
|
||||
WHERE created_at > $1;
|
||||
"#,
|
||||
)
|
||||
.bind(time_in_the_past) // $1
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn recent_highest_throughput_partitions(
|
||||
&mut self,
|
||||
shard_id: Option<ShardId>,
|
||||
|
|
Loading…
Reference in New Issue