diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 41e78c242a..6988fa7ae6 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -38,8 +38,12 @@ pub type Result = std::result::Result; /// Delete all unreferenced parquet files. /// -/// This will hold the transaction lock while the list of files is being gathered. -pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> +/// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held +/// use `max_files` which will limit the number of files to delete in this cleanup round. +pub async fn cleanup_unreferenced_parquet_files( + catalog: &PreservedCatalog, + max_files: usize, +) -> Result<()> where S: CatalogState, { @@ -72,31 +76,30 @@ where let prefix = data_location(&store, server_id, db_name); // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long - let to_remove = store - .list(Some(&prefix)) - .await - .context(ReadError)? - .map_ok(|paths| { - paths - .into_iter() - .filter(|path| { - let path_parsed: DirsAndFileName = path.clone().into(); + let mut to_remove = vec![]; + let mut stream = store.list(Some(&prefix)).await.context(ReadError)?; - // only delete if all of the following conditions are met: - // - filename ends with `.parquet` - // - file is not tracked by the catalog - path_parsed - .file_name - .as_ref() - .map(|part| part.encoded().ends_with(".parquet")) - .unwrap_or(false) - && !all_known.contains(&path_parsed) - }) - .collect::>() - }) - .try_concat() - .await - .context(ReadError)?; + 'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? { + for path in paths { + if to_remove.len() >= max_files { + break 'outer; + } + let path_parsed: DirsAndFileName = path.clone().into(); + + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + if path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + { + to_remove.push(path); + } + } + } // abort transaction cleanly to avoid warnings about uncommited transactions transaction.abort(); @@ -168,6 +171,27 @@ mod tests { test_utils::{make_metadata, make_object_store}, }; + #[tokio::test] + async fn test_cleanup_empty() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); + } + #[tokio::test] async fn test_cleanup_rules() { let object_store = make_object_store(); @@ -215,7 +239,9 @@ mod tests { } // run clean-up - cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); // list all files let all_files = list_all_files(&object_store).await; @@ -256,7 +282,9 @@ mod tests { path.display() }, async { - cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); }, ); @@ -265,6 +293,49 @@ mod tests { } } + #[tokio::test] + async fn test_cleanup_max_files() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // create some files + let mut to_remove: HashSet = Default::default(); + for chunk_id in 0..3 { + let (path, _md) = make_metadata(&object_store, "foo", chunk_id).await; + to_remove.insert(path.display()); + } + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog, 2) + .await + .unwrap(); + + // should only delete 2 + let all_files = list_all_files(&object_store).await; + let leftover: HashSet<_> = all_files.intersection(&to_remove).collect(); + assert_eq!(leftover.len(), 1); + + // run clean-up again + cleanup_unreferenced_parquet_files(&catalog, 2) + .await + .unwrap(); + + // should delete remaining file + let all_files = list_all_files(&object_store).await; + let leftover: HashSet<_> = all_files.intersection(&to_remove).collect(); + assert_eq!(leftover.len(), 0); + } + fn make_server_id() -> ServerId { ServerId::new(NonZeroU32::new(1).unwrap()) } diff --git a/server/src/db.rs b/server/src/db.rs index f3ad58301e..15fdd8fe4e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -965,7 +965,7 @@ impl Db { debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; - if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog, 1_000).await { error!(%e, "error in background cleanup task"); } } => {},