refactor: shorten time we hold the transaction lock during clean-up
parent
18f5dd9ae1
commit
d7e3bc569e
|
@ -25,6 +25,8 @@ pub enum Error {
|
|||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Delete all unreferenced parquet files.
|
||||
///
|
||||
/// This will hold the transaction lock while the list of files is being gathered.
|
||||
pub async fn cleanup_unreferenced_parquet_files<S>(catalog: &PreservedCatalog<S>) -> Result<()>
|
||||
where
|
||||
S: CatalogState,
|
||||
|
@ -37,30 +39,45 @@ where
|
|||
let store = catalog.object_store();
|
||||
let prefix = data_location(&store, catalog.server_id(), catalog.db_name());
|
||||
|
||||
let mut stream = store.list(Some(&prefix)).await.context(ReadError)?;
|
||||
let mut files_removed = 0;
|
||||
while let Some(paths) = stream.try_next().await.context(ReadError)? {
|
||||
for path in paths {
|
||||
let path_parsed: DirsAndFileName = path.clone().into();
|
||||
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long
|
||||
let to_remove = store
|
||||
.list(Some(&prefix))
|
||||
.await
|
||||
.context(ReadError)?
|
||||
.map_ok(|paths| {
|
||||
paths
|
||||
.into_iter()
|
||||
.filter(|path| {
|
||||
let path_parsed: DirsAndFileName = path.clone().into();
|
||||
|
||||
// only delete if all of the following conditions are met:
|
||||
// - filename ends with `.parquet`
|
||||
// - file is not tracked by the catalog
|
||||
if path_parsed
|
||||
.file_name
|
||||
.as_ref()
|
||||
.map(|part| part.encoded().ends_with(".parquet"))
|
||||
.unwrap_or(false)
|
||||
&& !all_known.contains(&path_parsed)
|
||||
{
|
||||
store.delete(&path).await.context(WriteError)?;
|
||||
files_removed += 1;
|
||||
}
|
||||
}
|
||||
// only delete if all of the following conditions are met:
|
||||
// - filename ends with `.parquet`
|
||||
// - file is not tracked by the catalog
|
||||
path_parsed
|
||||
.file_name
|
||||
.as_ref()
|
||||
.map(|part| part.encoded().ends_with(".parquet"))
|
||||
.unwrap_or(false)
|
||||
&& !all_known.contains(&path_parsed)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.try_concat()
|
||||
.await
|
||||
.context(ReadError)?;
|
||||
|
||||
// abort transaction cleanly to avoid warnings about uncommited transactions
|
||||
transaction.abort();
|
||||
|
||||
// now that the transaction lock is dropped, perform the actual (and potentially slow) delete operation
|
||||
let n_files = to_remove.len();
|
||||
info!("Found {} files to delete, start deletion.", n_files);
|
||||
|
||||
for path in to_remove {
|
||||
store.delete(&path).await.context(WriteError)?;
|
||||
}
|
||||
|
||||
transaction.abort();
|
||||
info!("Removed {} files during clean-up.", files_removed);
|
||||
info!("Finished deletion, removed {} files.", n_files);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue