From 86a2c249ecba22f73879ad66630a42e7d6273a8b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 1 Jun 2023 18:17:28 +0200 Subject: [PATCH] refactor: faster PG `ParquetFileRepo` (#7907) * refactor: remove `ParquetFileRepo::flag_for_delete` * refactor: batch update parquet files in catalog * refactor: avoid data roundtrips through postgres * refactor: do not return ID from PG when we do not need it --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_catalog/src/interface.rs | 15 +++++---- iox_catalog/src/mem.rs | 5 --- iox_catalog/src/metrics.rs | 1 - iox_catalog/src/postgres.rs | 64 +++++++++++++++++------------------- iox_catalog/src/sqlite.rs | 9 +---- iox_tests/src/catalog.rs | 6 ++-- 6 files changed, 42 insertions(+), 58 deletions(-) diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index b9055692ce..602bda1c8a 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -448,9 +448,6 @@ pub trait ParquetFileRepo: Send + Sync { /// This is mostly useful for testing and will likely not succeed in production. async fn list_all(&mut self) -> Result>; - /// Flag the parquet file for deletion - async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; - /// Flag all parquet files for deletion that are older than their namespace's retention period. async fn flag_for_delete_by_retention(&mut self) -> Result>; @@ -1755,7 +1752,7 @@ pub(crate) mod test_helpers { // verify to_delete can be updated to a timestamp repos .parquet_files() - .flag_for_delete(parquet_file.id) + .create_upgrade_delete(&[parquet_file.id], &[], &[], CompactionLevel::Initial) .await .unwrap(); @@ -1883,7 +1880,11 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(vec![f1.clone(), f2.clone(), f3.clone()], files); - repos.parquet_files().flag_for_delete(f2.id).await.unwrap(); + repos + .parquet_files() + .create_upgrade_delete(&[f2.id], &[], &[], CompactionLevel::Initial) + .await + .unwrap(); let files = repos .parquet_files() .list_by_namespace_not_to_delete(namespace2.id) @@ -2235,7 +2236,7 @@ pub(crate) mod test_helpers { .unwrap(); repos .parquet_files() - .flag_for_delete(delete_l0_file.id) + .create_upgrade_delete(&[delete_l0_file.id], &[], &[], CompactionLevel::Initial) .await .unwrap(); let partitions = repos @@ -2587,7 +2588,7 @@ pub(crate) mod test_helpers { .unwrap(); repos .parquet_files() - .flag_for_delete(delete_file.id) + .create_upgrade_delete(&[delete_file.id], &[], &[], CompactionLevel::Initial) .await .unwrap(); let level1_file_params = ParquetFileParams { diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 1443362e32..b6e804ffd3 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -721,11 +721,6 @@ impl ParquetFileRepo for MemTxn { Ok(stage.parquet_files.clone()) } - async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { - let marked_at = Timestamp::from(self.time_provider.now()); - flag_for_delete(self.stage(), id, marked_at).await - } - async fn flag_for_delete_by_retention(&mut self) -> Result> { let now = Timestamp::from(self.time_provider.now()); let stage = self.stage(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 6ea7a96b5b..f94375651d 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -187,7 +187,6 @@ decorate!( methods = [ "parquet_create" = create(&mut self, parquet_file_params: ParquetFileParams) -> Result; "parquet_list_all" = list_all(&mut self) -> Result>; - "parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; "parquet_flag_for_delete_by_retention" = flag_for_delete_by_retention(&mut self) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 7e89021136..aae8786ff6 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1271,7 +1271,8 @@ RETURNING * impl ParquetFileRepo for PostgresTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { let executor = &mut self.inner; - create_parquet_file(executor, parquet_file_params).await + let id = create_parquet_file(executor, &parquet_file_params).await?; + Ok(ParquetFile::from_params(parquet_file_params, id)) } async fn list_all(&mut self) -> Result> { @@ -1291,13 +1292,6 @@ FROM parquet_file; .map_err(|e| Error::SqlxError { source: e }) } - async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { - let marked_at = Timestamp::from(self.time_provider.now()); - let executor = &mut self.inner; - - flag_for_delete(executor, id, marked_at).await - } - async fn flag_for_delete_by_retention(&mut self) -> Result> { let flagged_at = Timestamp::from(self.time_provider.now()); // TODO - include check of table retention period once implemented @@ -1461,16 +1455,14 @@ WHERE object_store_id = $1; .map_err(|e| Error::StartTransaction { source: e })?; let marked_at = Timestamp::from(self.time_provider.now()); - for id in delete { - flag_for_delete(&mut tx, *id, marked_at).await?; - } + flag_for_delete(&mut tx, delete, marked_at).await?; update_compaction_level(&mut tx, upgrade, target_level).await?; let mut ids = Vec::with_capacity(create.len()); for file in create { - let pf = create_parquet_file(&mut tx, file.clone()).await?; - ids.push(pf.id); + let id = create_parquet_file(&mut tx, file).await?; + ids.push(id); } tx.commit() @@ -1484,8 +1476,8 @@ WHERE object_store_id = $1; // They are also used by the respective create/flag_for_delete/update_compaction_level methods. async fn create_parquet_file<'q, E>( executor: E, - parquet_file_params: ParquetFileParams, -) -> Result + parquet_file_params: &ParquetFileParams, +) -> Result where E: Executor<'q, Database = Postgres>, { @@ -1504,17 +1496,14 @@ where max_l0_created_at, } = parquet_file_params; - let query = sqlx::query_as::<_, ParquetFile>( + let query = sqlx::query_scalar::<_, ParquetFileId>( r#" INSERT INTO parquet_file ( shard_id, table_id, partition_id, object_store_id, min_time, max_time, file_size_bytes, row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) -RETURNING - id, table_id, partition_id, object_store_id, - min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, namespace_id, column_set, max_l0_created_at; +RETURNING id; "#, ) .bind(TRANSITION_SHARD_ID) // $1 @@ -1530,9 +1519,11 @@ RETURNING .bind(namespace_id) // $11 .bind(column_set) // $12 .bind(max_l0_created_at); // $13 - let parquet_file = query.fetch_one(executor).await.map_err(|e| { + let parquet_file_id = query.fetch_one(executor).await.map_err(|e| { if is_unique_violation(&e) { - Error::FileExists { object_store_id } + Error::FileExists { + object_store_id: *object_store_id, + } } else if is_fk_violation(&e) { Error::ForeignKeyViolation { source: e } } else { @@ -1540,16 +1531,23 @@ RETURNING } })?; - Ok(parquet_file) + Ok(parquet_file_id) } -async fn flag_for_delete<'q, E>(executor: E, id: ParquetFileId, marked_at: Timestamp) -> Result<()> +async fn flag_for_delete<'q, E>( + executor: E, + ids: &[ParquetFileId], + marked_at: Timestamp, +) -> Result<()> where E: Executor<'q, Database = Postgres>, { - let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = $2;"#) + // If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx. + // See https://github.com/launchbadge/sqlx/issues/1744 + let ids: Vec<_> = ids.iter().map(|p| p.get()).collect(); + let query = sqlx::query(r#"UPDATE parquet_file SET to_delete = $1 WHERE id = ANY($2);"#) .bind(marked_at) // $1 - .bind(id); // $2 + .bind(&ids[..]); // $2 query .execute(executor) .await @@ -1562,7 +1560,7 @@ async fn update_compaction_level<'q, E>( executor: E, parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel, -) -> Result> +) -> Result<()> where E: Executor<'q, Database = Postgres>, { @@ -1573,19 +1571,17 @@ where r#" UPDATE parquet_file SET compaction_level = $1 -WHERE id = ANY($2) -RETURNING id; +WHERE id = ANY($2); "#, ) .bind(compaction_level) // $1 .bind(&ids[..]); // $2 - let updated = query - .fetch_all(executor) + query + .execute(executor) .await .map_err(|e| Error::SqlxError { source: e })?; - let updated = updated.into_iter().map(|row| row.get("id")).collect(); - Ok(updated) + Ok(()) } /// The error code returned by Postgres for a unique constraint violation. @@ -2173,7 +2169,7 @@ mod tests { // flag f1 for deletion and assert that the total file size is reduced accordingly. repos .parquet_files() - .flag_for_delete(f1.id) + .create_upgrade_delete(&[f1.id], &[], &[], CompactionLevel::Initial) .await .expect("flag parquet file for deletion should succeed"); let total_file_size_bytes: i64 = diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index b4efcd6119..bec243d0dc 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1143,13 +1143,6 @@ FROM parquet_file; .collect()) } - async fn flag_for_delete(&mut self, id: ParquetFileId) -> Result<()> { - let marked_at = Timestamp::from(self.time_provider.now()); - let executor = self.inner.get_mut(); - - flag_for_delete(executor, id, marked_at).await - } - async fn flag_for_delete_by_retention(&mut self) -> Result> { let flagged_at = Timestamp::from(self.time_provider.now()); // TODO - include check of table retention period once implemented @@ -1776,7 +1769,7 @@ mod tests { // flag f1 for deletion and assert that the total file size is reduced accordingly. repos .parquet_files() - .flag_for_delete(f1.id) + .create_upgrade_delete(&[f1.id], &[], &[], CompactionLevel::Initial) .await .expect("flag parquet file for deletion should succeed"); let total_file_size_bytes: i64 = diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 392d3e854c..d2f796d386 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -588,7 +588,7 @@ impl TestPartition { if to_delete { repos .parquet_files() - .flag_for_delete(parquet_file.id) + .create_upgrade_delete(&[parquet_file.id], &[], &[], CompactionLevel::Initial) .await .unwrap(); } @@ -817,9 +817,9 @@ impl TestParquetFile { repos .parquet_files() - .flag_for_delete(self.parquet_file.id) + .create_upgrade_delete(&[self.parquet_file.id], &[], &[], CompactionLevel::Initial) .await - .unwrap() + .unwrap(); } /// Get Parquet file schema.