refactor: faster PG `ParquetFileRepo` (#7907)

* refactor: remove `ParquetFileRepo::flag_for_delete`

* refactor: batch update parquet files in catalog

* refactor: avoid data roundtrips through postgres

* refactor: do not return ID from PG when we do not need it

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-06-01 18:17:28 +02:00 committed by GitHub
parent 72ff001d33
commit 86a2c249ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 58 deletions

View File

@ -448,9 +448,6 @@ pub trait ParquetFileRepo: Send + Sync {
/// This is mostly useful for testing and will likely not succeed in production. /// This is mostly useful for testing and will likely not succeed in production.
async fn list_all(&mut self) -> Result<Vec<ParquetFile>>; async fn list_all(&mut self) -> Result<Vec<ParquetFile>>;
/// Flag the parquet file for deletion
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
/// Flag all parquet files for deletion that are older than their namespace's retention period. /// Flag all parquet files for deletion that are older than their namespace's retention period.
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>>; async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>>;
@ -1755,7 +1752,7 @@ pub(crate) mod test_helpers {
// verify to_delete can be updated to a timestamp // verify to_delete can be updated to a timestamp
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(parquet_file.id) .create_upgrade_delete(&[parquet_file.id], &[], &[], CompactionLevel::Initial)
.await .await
.unwrap(); .unwrap();
@ -1883,7 +1880,11 @@ pub(crate) mod test_helpers {
.unwrap(); .unwrap();
assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files); assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files);
repos.parquet_files().flag_for_delete(f2.id).await.unwrap(); repos
.parquet_files()
.create_upgrade_delete(&[f2.id], &[], &[], CompactionLevel::Initial)
.await
.unwrap();
let files = repos let files = repos
.parquet_files() .parquet_files()
.list_by_namespace_not_to_delete(namespace2.id) .list_by_namespace_not_to_delete(namespace2.id)
@ -2235,7 +2236,7 @@ pub(crate) mod test_helpers {
.unwrap(); .unwrap();
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(delete_l0_file.id) .create_upgrade_delete(&[delete_l0_file.id], &[], &[], CompactionLevel::Initial)
.await .await
.unwrap(); .unwrap();
let partitions = repos let partitions = repos
@ -2587,7 +2588,7 @@ pub(crate) mod test_helpers {
.unwrap(); .unwrap();
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(delete_file.id) .create_upgrade_delete(&[delete_file.id], &[], &[], CompactionLevel::Initial)
.await .await
.unwrap(); .unwrap();
let level1_file_params = ParquetFileParams { let level1_file_params = ParquetFileParams {

View File

@ -721,11 +721,6 @@ impl ParquetFileRepo for MemTxn {
Ok(stage.parquet_files.clone()) Ok(stage.parquet_files.clone())
} }
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
flag_for_delete(self.stage(), id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> { async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
let now = Timestamp::from(self.time_provider.now()); let now = Timestamp::from(self.time_provider.now());
let stage = self.stage(); let stage = self.stage();

View File

@ -187,7 +187,6 @@ decorate!(
methods = [ methods = [
"parquet_create" = create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile>; "parquet_create" = create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile>;
"parquet_list_all" = list_all(&mut self) -> Result<Vec<ParquetFile>>; "parquet_list_all" = list_all(&mut self) -> Result<Vec<ParquetFile>>;
"parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
"parquet_flag_for_delete_by_retention" = flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>>; "parquet_flag_for_delete_by_retention" = flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;

View File

@ -1271,7 +1271,8 @@ RETURNING *
impl ParquetFileRepo for PostgresTxn { impl ParquetFileRepo for PostgresTxn {
async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile> {
let executor = &mut self.inner; let executor = &mut self.inner;
create_parquet_file(executor, parquet_file_params).await let id = create_parquet_file(executor, &parquet_file_params).await?;
Ok(ParquetFile::from_params(parquet_file_params, id))
} }
async fn list_all(&mut self) -> Result<Vec<ParquetFile>> { async fn list_all(&mut self) -> Result<Vec<ParquetFile>> {
@ -1291,13 +1292,6 @@ FROM parquet_file;
.map_err(|e| Error::SqlxError { source: e }) .map_err(|e| Error::SqlxError { source: e })
} }
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
let executor = &mut self.inner;
flag_for_delete(executor, id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> { async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
let flagged_at = Timestamp::from(self.time_provider.now()); let flagged_at = Timestamp::from(self.time_provider.now());
// TODO - include check of table retention period once implemented // TODO - include check of table retention period once implemented
@ -1461,16 +1455,14 @@ WHERE object_store_id = $1;
.map_err(|e| Error::StartTransaction { source: e })?; .map_err(|e| Error::StartTransaction { source: e })?;
let marked_at = Timestamp::from(self.time_provider.now()); let marked_at = Timestamp::from(self.time_provider.now());
for id in delete { flag_for_delete(&mut tx, delete, marked_at).await?;
flag_for_delete(&mut tx, *id, marked_at).await?;
}
update_compaction_level(&mut tx, upgrade, target_level).await?; update_compaction_level(&mut tx, upgrade, target_level).await?;
let mut ids = Vec::with_capacity(create.len()); let mut ids = Vec::with_capacity(create.len());
for file in create { for file in create {
let pf = create_parquet_file(&mut tx, file.clone()).await?; let id = create_parquet_file(&mut tx, file).await?;
ids.push(pf.id); ids.push(id);
} }
tx.commit() tx.commit()
@ -1484,8 +1476,8 @@ WHERE object_store_id = $1;
// They are also used by the respective create/flag_for_delete/update_compaction_level methods. // They are also used by the respective create/flag_for_delete/update_compaction_level methods.
async fn create_parquet_file<'q, E>( async fn create_parquet_file<'q, E>(
executor: E, executor: E,
parquet_file_params: ParquetFileParams, parquet_file_params: &ParquetFileParams,
) -> Result<ParquetFile> ) -> Result<ParquetFileId>
where where
E: Executor<'q, Database = Postgres>, E: Executor<'q, Database = Postgres>,
{ {
@ -1504,17 +1496,14 @@ where
max_l0_created_at, max_l0_created_at,
} = parquet_file_params; } = parquet_file_params;
let query = sqlx::query_as::<_, ParquetFile>( let query = sqlx::query_scalar::<_, ParquetFileId>(
r#" r#"
INSERT INTO parquet_file ( INSERT INTO parquet_file (
shard_id, table_id, partition_id, object_store_id, shard_id, table_id, partition_id, object_store_id,
min_time, max_time, file_size_bytes, min_time, max_time, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at )
VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 )
RETURNING RETURNING id;
id, table_id, partition_id, object_store_id,
min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at;
"#, "#,
) )
.bind(TRANSITION_SHARD_ID) // $1 .bind(TRANSITION_SHARD_ID) // $1
@ -1530,9 +1519,11 @@ RETURNING
.bind(namespace_id) // $11 .bind(namespace_id) // $11
.bind(column_set) // $12 .bind(column_set) // $12
.bind(max_l0_created_at); // $13 .bind(max_l0_created_at); // $13
let parquet_file = query.fetch_one(executor).await.map_err(|e| { let parquet_file_id = query.fetch_one(executor).await.map_err(|e| {
if is_unique_violation(&e) { if is_unique_violation(&e) {
Error::FileExists { object_store_id } Error::FileExists {
object_store_id: *object_store_id,
}
} else if is_fk_violation(&e) { } else if is_fk_violation(&e) {
Error::ForeignKeyViolation { source: e } Error::ForeignKeyViolation { source: e }
} else { } else {
@ -1540,16 +1531,23 @@ RETURNING
} }
})?; })?;
Ok(parquet_file) Ok(parquet_file_id)
} }
async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()> async fn flag_for_delete<'q, E>(
executor: E,
ids: &[ParquetFileId],
marked_at: Timestamp,
) -> Result<()>
where where
E: Executor<'q, Database = Postgres>, E: Executor<'q, Database = Postgres>,
{ {
let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
// See https://github.com/launchbadge/sqlx/issues/1744
let ids: Vec<_> = ids.iter().map(|p| p.get()).collect();
let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = ANY($2);"#)
.bind(marked_at) // $1 .bind(marked_at) // $1
.bind(id); // $2 .bind(&ids[..]); // $2
query query
.execute(executor) .execute(executor)
.await .await
@ -1562,7 +1560,7 @@ async fn update_compaction_level<'q, E>(
executor: E, executor: E,
parquet_file_ids: &[ParquetFileId], parquet_file_ids: &[ParquetFileId],
compaction_level: CompactionLevel, compaction_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>> ) -> Result<()>
where where
E: Executor<'q, Database = Postgres>, E: Executor<'q, Database = Postgres>,
{ {
@ -1573,19 +1571,17 @@ where
r#" r#"
UPDATE parquet_file UPDATE parquet_file
SET compaction_level = $1 SET compaction_level = $1
WHERE id = ANY($2) WHERE id = ANY($2);
RETURNING id;
"#, "#,
) )
.bind(compaction_level) // $1 .bind(compaction_level) // $1
.bind(&ids[..]); // $2 .bind(&ids[..]); // $2
let updated = query query
.fetch_all(executor) .execute(executor)
.await .await
.map_err(|e| Error::SqlxError { source: e })?; .map_err(|e| Error::SqlxError { source: e })?;
let updated = updated.into_iter().map(|row| row.get("id")).collect(); Ok(())
Ok(updated)
} }
/// The error code returned by Postgres for a unique constraint violation. /// The error code returned by Postgres for a unique constraint violation.
@ -2173,7 +2169,7 @@ mod tests {
// flag f1 for deletion and assert that the total file size is reduced accordingly. // flag f1 for deletion and assert that the total file size is reduced accordingly.
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(f1.id) .create_upgrade_delete(&[f1.id], &[], &[], CompactionLevel::Initial)
.await .await
.expect("flag parquet file for deletion should succeed"); .expect("flag parquet file for deletion should succeed");
let total_file_size_bytes: i64 = let total_file_size_bytes: i64 =

View File

@ -1143,13 +1143,6 @@ FROM parquet_file;
.collect()) .collect())
} }
async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> {
let marked_at = Timestamp::from(self.time_provider.now());
let executor = self.inner.get_mut();
flag_for_delete(executor, id, marked_at).await
}
async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> { async fn flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>> {
let flagged_at = Timestamp::from(self.time_provider.now()); let flagged_at = Timestamp::from(self.time_provider.now());
// TODO - include check of table retention period once implemented // TODO - include check of table retention period once implemented
@ -1776,7 +1769,7 @@ mod tests {
// flag f1 for deletion and assert that the total file size is reduced accordingly. // flag f1 for deletion and assert that the total file size is reduced accordingly.
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(f1.id) .create_upgrade_delete(&[f1.id], &[], &[], CompactionLevel::Initial)
.await .await
.expect("flag parquet file for deletion should succeed"); .expect("flag parquet file for deletion should succeed");
let total_file_size_bytes: i64 = let total_file_size_bytes: i64 =

View File

@ -588,7 +588,7 @@ impl TestPartition {
if to_delete { if to_delete {
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(parquet_file.id) .create_upgrade_delete(&[parquet_file.id], &[], &[], CompactionLevel::Initial)
.await .await
.unwrap(); .unwrap();
} }
@ -817,9 +817,9 @@ impl TestParquetFile {
repos repos
.parquet_files() .parquet_files()
.flag_for_delete(self.parquet_file.id) .create_upgrade_delete(&[self.parquet_file.id], &[], &[], CompactionLevel::Initial)
.await .await
.unwrap() .unwrap();
} }
/// Get Parquet file schema. /// Get Parquet file schema.