diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index a0702b401a..3a829e38cb 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -25,6 +25,8 @@ pub enum Error { pub type Result = std::result::Result; /// Delete all unreferenced parquet files. +/// +/// This will hold the transaction lock while the list of files is being gathered. pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> where S: CatalogState, @@ -37,30 +39,45 @@ where let store = catalog.object_store(); let prefix = data_location(&store, catalog.server_id(), catalog.db_name()); - let mut stream = store.list(Some(&prefix)).await.context(ReadError)?; - let mut files_removed = 0; - while let Some(paths) = stream.try_next().await.context(ReadError)? { - for path in paths { - let path_parsed: DirsAndFileName = path.clone().into(); + // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long + let to_remove = store + .list(Some(&prefix)) + .await + .context(ReadError)? + .map_ok(|paths| { + paths + .into_iter() + .filter(|path| { + let path_parsed: DirsAndFileName = path.clone().into(); - // only delete if all of the following conditions are met: - // - filename ends with `.parquet` - // - file is not tracked by the catalog - if path_parsed - .file_name - .as_ref() - .map(|part| part.encoded().ends_with(".parquet")) - .unwrap_or(false) - && !all_known.contains(&path_parsed) - { - store.delete(&path).await.context(WriteError)?; - files_removed += 1; - } - } + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + }) + .collect::>() + }) + .try_concat() + .await + .context(ReadError)?; + + // abort transaction cleanly to avoid warnings about uncommited transactions + transaction.abort(); + + // now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation + let n_files = to_remove.len(); + info!("Found {} files to delete, start deletion.", n_files); + + for path in to_remove { + store.delete(&path).await.context(WriteError)?; } - transaction.abort(); - info!("Removed {} files during clean-up.", files_removed); + info!("Finished deletion, removed {} files.", n_files); Ok(()) }