From 91df8a30e7f0a5fc258f3260e31f4fcea77abd30 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 3 Jun 2021 17:43:11 +0200 Subject: [PATCH] feat: limit number of files during storage cleanup Since the number of parquet files can potentially be unbound (aka very very large) and we do not want to hold the transaction lock for too long and also want to limit memory consumption of the cleanup routine, let's limit the number of files that we collect for cleanup. --- parquet_file/src/cleanup.rs | 127 ++++++++++++++++++++++++++++-------- server/src/db.rs | 2 +- 2 files changed, 100 insertions(+), 29 deletions(-) diff --git a/parquet_file/src/cleanup.rs b/parquet_file/src/cleanup.rs index 41e78c242a..6988fa7ae6 100644 --- a/parquet_file/src/cleanup.rs +++ b/parquet_file/src/cleanup.rs @@ -38,8 +38,12 @@ pub type Result = std::result::Result; /// Delete all unreferenced parquet files. /// -/// This will hold the transaction lock while the list of files is being gathered. -pub async fn cleanup_unreferenced_parquet_files(catalog: &PreservedCatalog) -> Result<()> +/// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held +/// use `max_files` which will limit the number of files to delete in this cleanup round. +pub async fn cleanup_unreferenced_parquet_files( + catalog: &PreservedCatalog, + max_files: usize, +) -> Result<()> where S: CatalogState, { @@ -72,31 +76,30 @@ where let prefix = data_location(&store, server_id, db_name); // gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long - let to_remove = store - .list(Some(&prefix)) - .await - .context(ReadError)? - .map_ok(|paths| { - paths - .into_iter() - .filter(|path| { - let path_parsed: DirsAndFileName = path.clone().into(); + let mut to_remove = vec![]; + let mut stream = store.list(Some(&prefix)).await.context(ReadError)?; - // only delete if all of the following conditions are met: - // - filename ends with `.parquet` - // - file is not tracked by the catalog - path_parsed - .file_name - .as_ref() - .map(|part| part.encoded().ends_with(".parquet")) - .unwrap_or(false) - && !all_known.contains(&path_parsed) - }) - .collect::>() - }) - .try_concat() - .await - .context(ReadError)?; + 'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? { + for path in paths { + if to_remove.len() >= max_files { + break 'outer; + } + let path_parsed: DirsAndFileName = path.clone().into(); + + // only delete if all of the following conditions are met: + // - filename ends with `.parquet` + // - file is not tracked by the catalog + if path_parsed + .file_name + .as_ref() + .map(|part| part.encoded().ends_with(".parquet")) + .unwrap_or(false) + && !all_known.contains(&path_parsed) + { + to_remove.push(path); + } + } + } // abort transaction cleanly to avoid warnings about uncommited transactions transaction.abort(); @@ -168,6 +171,27 @@ mod tests { test_utils::{make_metadata, make_object_store}, }; + #[tokio::test] + async fn test_cleanup_empty() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); + } + #[tokio::test] async fn test_cleanup_rules() { let object_store = make_object_store(); @@ -215,7 +239,9 @@ mod tests { } // run clean-up - cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); // list all files let all_files = list_all_files(&object_store).await; @@ -256,7 +282,9 @@ mod tests { path.display() }, async { - cleanup_unreferenced_parquet_files(&catalog).await.unwrap(); + cleanup_unreferenced_parquet_files(&catalog, 1_000) + .await + .unwrap(); }, ); @@ -265,6 +293,49 @@ mod tests { } } + #[tokio::test] + async fn test_cleanup_max_files() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + let catalog = PreservedCatalog::::new_empty( + Arc::clone(&object_store), + server_id, + db_name, + (), + ) + .await + .unwrap(); + + // create some files + let mut to_remove: HashSet = Default::default(); + for chunk_id in 0..3 { + let (path, _md) = make_metadata(&object_store, "foo", chunk_id).await; + to_remove.insert(path.display()); + } + + // run clean-up + cleanup_unreferenced_parquet_files(&catalog, 2) + .await + .unwrap(); + + // should only delete 2 + let all_files = list_all_files(&object_store).await; + let leftover: HashSet<_> = all_files.intersection(&to_remove).collect(); + assert_eq!(leftover.len(), 1); + + // run clean-up again + cleanup_unreferenced_parquet_files(&catalog, 2) + .await + .unwrap(); + + // should delete remaining file + let all_files = list_all_files(&object_store).await; + let leftover: HashSet<_> = all_files.intersection(&to_remove).collect(); + assert_eq!(leftover.len(), 0); + } + fn make_server_id() -> ServerId { ServerId::new(NonZeroU32::new(1).unwrap()) } diff --git a/server/src/db.rs b/server/src/db.rs index f3ad58301e..15fdd8fe4e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -965,7 +965,7 @@ impl Db { debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; - if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await { + if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog, 1_000).await { error!(%e, "error in background cleanup task"); } } => {},