feat: limit number of files during storage cleanup

Since the number of parquet files can potentially be unbound (aka very
very large) and we do not want to hold the transaction lock for too
long and also want to limit memory consumption of the cleanup routine,
let's limit the number of files that we collect for cleanup.
pull/24376/head
Marco Neumann 2021-06-03 17:43:11 +02:00
parent 5037f5e23c
commit 91df8a30e7
2 changed files with 100 additions and 29 deletions

View File

@ -38,8 +38,12 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Delete all unreferenced parquet files.
///
/// This will hold the transaction lock while the list of files is being gathered.
pub async fn cleanup_unreferenced_parquet_files<S>(catalog: &PreservedCatalog<S>) -> Result<()>
/// This will hold the transaction lock while the list of files is being gathered. To limit the time the lock is held
/// use `max_files` which will limit the number of files to delete in this cleanup round.
pub async fn cleanup_unreferenced_parquet_files<S>(
catalog: &PreservedCatalog<S>,
max_files: usize,
) -> Result<()>
where
S: CatalogState,
{
@ -72,31 +76,30 @@ where
let prefix = data_location(&store, server_id, db_name);
// gather a list of "files to remove" eagerly so we do not block transactions on the catalog for too long
let to_remove = store
.list(Some(&prefix))
.await
.context(ReadError)?
.map_ok(|paths| {
paths
.into_iter()
.filter(|path| {
let mut to_remove = vec![];
let mut stream = store.list(Some(&prefix)).await.context(ReadError)?;
'outer: while let Some(paths) = stream.try_next().await.context(ReadError)? {
for path in paths {
if to_remove.len() >= max_files {
break 'outer;
}
let path_parsed: DirsAndFileName = path.clone().into();
// only delete if all of the following conditions are met:
// - filename ends with `.parquet`
// - file is not tracked by the catalog
path_parsed
if path_parsed
.file_name
.as_ref()
.map(|part| part.encoded().ends_with(".parquet"))
.unwrap_or(false)
&& !all_known.contains(&path_parsed)
})
.collect::<Vec<_>>()
})
.try_concat()
.await
.context(ReadError)?;
{
to_remove.push(path);
}
}
}
// abort transaction cleanly to avoid warnings about uncommited transactions
transaction.abort();
@ -168,6 +171,27 @@ mod tests {
test_utils::{make_metadata, make_object_store},
};
#[tokio::test]
async fn test_cleanup_empty() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name,
(),
)
.await
.unwrap();
// run clean-up
cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
}
#[tokio::test]
async fn test_cleanup_rules() {
let object_store = make_object_store();
@ -215,7 +239,9 @@ mod tests {
}
// run clean-up
cleanup_unreferenced_parquet_files(&catalog).await.unwrap();
cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
// list all files
let all_files = list_all_files(&object_store).await;
@ -256,7 +282,9 @@ mod tests {
path.display()
},
async {
cleanup_unreferenced_parquet_files(&catalog).await.unwrap();
cleanup_unreferenced_parquet_files(&catalog, 1_000)
.await
.unwrap();
},
);
@ -265,6 +293,49 @@ mod tests {
}
}
#[tokio::test]
async fn test_cleanup_max_files() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let catalog = PreservedCatalog::<TestCatalogState>::new_empty(
Arc::clone(&object_store),
server_id,
db_name,
(),
)
.await
.unwrap();
// create some files
let mut to_remove: HashSet<String> = Default::default();
for chunk_id in 0..3 {
let (path, _md) = make_metadata(&object_store, "foo", chunk_id).await;
to_remove.insert(path.display());
}
// run clean-up
cleanup_unreferenced_parquet_files(&catalog, 2)
.await
.unwrap();
// should only delete 2
let all_files = list_all_files(&object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 1);
// run clean-up again
cleanup_unreferenced_parquet_files(&catalog, 2)
.await
.unwrap();
// should delete remaining file
let all_files = list_all_files(&object_store).await;
let leftover: HashSet<_> = all_files.intersection(&to_remove).collect();
assert_eq!(leftover.len(), 0);
}
fn make_server_id() -> ServerId {
ServerId::new(NonZeroU32::new(1).unwrap())
}

View File

@ -965,7 +965,7 @@ impl Db {
debug!(?duration, "cleanup worker sleeps");
tokio::time::sleep(duration).await;
if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog).await {
if let Err(e) = cleanup_unreferenced_parquet_files(&self.catalog, 1_000).await {
error!(%e, "error in background cleanup task");
}
} => {},