feat: function to read partition IDs of all partitions with new writes (#6613)

* feat: function to read partition IDs of all partitions with new writes

* chore: run fmt

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-01-18 11:30:15 -05:00 committed by GitHub
parent 3608d2881b
commit 7a5fdd1d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 111 additions and 0 deletions

View File

@ -521,6 +521,9 @@ pub trait PartitionRepo: Send + Sync {
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// Select partitions to compact
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>>;
}
/// Functions for working with tombstones in the catalog
@ -3888,6 +3891,13 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(partitions.is_empty());
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// create a deleted L0 file that was created 1 hour ago which is recently
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -3909,6 +3919,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// -----------------
// PARTITION two
@ -3927,6 +3945,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// Add a L0 file created non-recently (5 hours ago)
let l0_five_hour_ago_file_params = ParquetFileParams {
@ -3949,6 +3975,14 @@ pub(crate) mod test_helpers {
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// Add a L1 created recently (just now)
let l1_file_params = ParquetFileParams {
@ -3976,6 +4010,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// -----------------
// PARTITION three
@ -3998,6 +4044,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// Add a L2 created recently (just now) for partition three
// Since it is L2, the partition won't get updated
@ -4026,6 +4084,18 @@ pub(crate) mod test_helpers {
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// add an L0 file created recently (one hour ago) for partition three
let l0_one_hour_ago_file_params = ParquetFileParams {
@ -4053,6 +4123,19 @@ pub(crate) mod test_helpers {
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
assert_eq!(partitions[2].partition_id, partition3.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_to_compact(time_two_hour_ago)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// sort by partition id
let mut partitions = partitions;
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
assert_eq!(partitions[2], partition3.id);
// Limit max num partition
let partitions = repos

View File

@ -1019,6 +1019,19 @@ impl PartitionRepo for MemTxn {
Ok(partitions)
}
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>> {
let stage = self.stage();
let partitions: Vec<_> = stage
.partitions
.iter()
.filter(|p| p.new_file_at > Some(recent_time))
.map(|p| p.id)
.collect();
Ok(partitions)
}
}
#[async_trait]

View File

@ -253,6 +253,7 @@ decorate!(
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;
"partition_most_recent_n" = most_recent_n(&mut self, n: usize, shards: &[ShardId]) -> Result<Vec<Partition>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp, max_num_partitions: usize) -> Result<Vec<PartitionParam>>;
"partitions_to_compact" = partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>>;
]
);

View File

@ -1434,6 +1434,20 @@ WHERE id = $2;
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_to_compact(&mut self, recent_time: Timestamp) -> Result<Vec<PartitionId>> {
sqlx::query_as(
r#"
SELECT p.id as partition_id
FROM partition p
WHERE p.new_file_at > $1
"#,
)
.bind(recent_time) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]