feat: catalog query to select partitions with recently created files

pull/24376/head
NGA-TRAN 2023-01-04 15:54:46 -05:00
parent 1088baea3d
commit 72977bf250
5 changed files with 271 additions and 1 deletions

View File

@ -241,7 +241,7 @@ pub enum IngesterMapping {
}
/// Unique ID for a `Partition`
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, sqlx::FromRow)]
#[sqlx(transparent)]
pub struct PartitionId(i64);

View File

@ -625,6 +625,13 @@ pub trait ParquetFileRepo: Send + Sync {
max_time: Timestamp,
) -> Result<Vec<ParquetFile>>;
/// Select partition for cold/warm/hot compaction
/// These are partition with files created recently (after the specified time_in_the_past)
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionId>>;
/// List the most recent highest throughput partition for a given shard, if specified
async fn recent_highest_throughput_partitions(
&mut self,
@ -947,6 +954,7 @@ pub(crate) mod test_helpers {
test_parquet_file(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_0(Arc::clone(&catalog)).await;
test_parquet_file_compaction_level_1(Arc::clone(&catalog)).await;
test_partitions_with_recent_created_files(Arc::clone(&catalog)).await;
test_recent_highest_throughput_partitions(Arc::clone(&catalog)).await;
test_partitions_with_small_l1_file_count(Arc::clone(&catalog)).await;
test_update_to_compaction_level_1(Arc::clone(&catalog)).await;
@ -3684,6 +3692,236 @@ pub(crate) mod test_helpers {
repos.abort().await.unwrap();
}
async fn test_partitions_with_recent_created_files(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos
.topics()
.create_or_get("recent_created_files")
.await
.unwrap();
let pool = repos
.query_pools()
.create_or_get("recent_created_files")
.await
.unwrap();
let namespace = repos
.namespaces()
.create(
"test_partitions_with_recent_created_files",
None,
topic.id,
pool.id,
)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table_for_recent_created_files", namespace.id)
.await
.unwrap();
let shard = repos
.shards()
.create_or_get(&topic, ShardIndex::new(101))
.await
.unwrap();
// param for the tests
let time_now = Timestamp::from(catalog.time_provider().now());
let time_one_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(1));
let time_two_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(2));
let time_three_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(3));
let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5));
// Db has no partition
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// -----------------
// PARTITION one
// The DB has 1 partition but it does not have any file
let partition1 = repos
.partitions()
.create_or_get("one".into(), shard.id, table.id)
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
assert!(partitions.is_empty());
// create files for partition one
let parquet_file_params = ParquetFileParams {
shard_id: shard.id,
namespace_id: namespace.id,
table_id: partition1.table_id,
partition_id: partition1.id,
object_store_id: Uuid::new_v4(),
max_sequence_number: SequenceNumber::new(140),
min_time: Timestamp::new(1),
max_time: Timestamp::new(10),
file_size_bytes: 1337,
row_count: 0,
compaction_level: CompactionLevel::Initial,
created_at: time_three_hour_ago,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
// create a deleted L0 file that was created 3 hours ago
let delete_l0_file = repos
.parquet_files()
.create(parquet_file_params.clone())
.await
.unwrap();
repos
.parquet_files()
.flag_for_delete(delete_l0_file.id)
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// still empty becasue the file was not recently created
assert!(partitions.is_empty());
// create a deleted L0 file that was created 1 hour ago which is recently
let l0_one_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_one_hour_ago,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(l0_one_hour_ago_file_params.clone())
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// partition one should be returned
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// -----------------
// PARTITION two
// Partition two without any file
let partition2 = repos
.partitions()
.create_or_get("two".into(), shard.id, table.id)
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// should return partittion one only
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// Add a L0 file created non-recently (5 hours ago)
let l0_five_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_five_hour_ago,
partition_id: partition2.id,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(l0_five_hour_ago_file_params.clone())
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// still return partittione one only
assert_eq!(partitions.len(), 1);
assert!(partitions.contains(&partition1.id));
// Add a L1 created recently (just now)
let l1_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_now,
partition_id: partition2.id,
compaction_level: CompactionLevel::FileNonOverlapped,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(l1_file_params.clone())
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// should return both partitions
assert_eq!(partitions.len(), 2);
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
// -----------------
// PARTITION three
// Partition two without any file
let partition3 = repos
.partitions()
.create_or_get("three".into(), shard.id, table.id)
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// should return partittion one and two only
assert_eq!(partitions.len(), 2);
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
// add an L0 file created recently (one hour ago)
let l0_one_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_one_hour_ago,
partition_id: partition3.id,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(l0_one_hour_ago_file_params.clone())
.await
.unwrap();
let partitions = repos
.parquet_files()
.partitions_with_recent_created_files(time_two_hour_ago)
.await
.unwrap();
// should return all partitions
assert_eq!(partitions.len(), 3);
assert!(partitions.contains(&partition1.id));
assert!(partitions.contains(&partition2.id));
assert!(partitions.contains(&partition3.id));
// drop the namespace to avoid the crearted data in this tests from affacting other tests
repos
.namespaces()
.delete("test_partitions_with_recent_created_files")
.await
.expect("delete namespace should succeed");
}
async fn test_recent_highest_throughput_partitions(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos

View File

@ -1317,6 +1317,20 @@ impl ParquetFileRepo for MemTxn {
.cloned()
.collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionId>> {
let stage = self.stage();
let partitions: Vec<_> = stage
.parquet_files
.iter()
.filter(|f| f.created_at > time_in_the_past)
.map(|f| f.partition_id)
.collect();
Ok(partitions)
}
async fn recent_highest_throughput_partitions(
&mut self,

View File

@ -288,6 +288,7 @@ decorate!(
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp) -> Result<Vec<PartitionId>>;
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option<ShardId>, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;

View File

@ -1876,6 +1876,23 @@ WHERE parquet_file.shard_id = $1
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
) -> Result<Vec<PartitionId>> {
sqlx::query_as::<_, PartitionId>(
r#"
SELECT distinct partition_id
FROM parquet_file
WHERE created_at > $1;
"#,
)
.bind(time_in_the_past) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn recent_highest_throughput_partitions(
&mut self,
shard_id: Option<ShardId>,