fix: do not delete non-parquet files during catalog-driven cleanup
parent
5ed16ff294
commit
b55eae98da
|
@ -1,7 +1,7 @@
|
|||
//! Methods to cleanup the object store.
|
||||
|
||||
use futures::TryStreamExt;
|
||||
use object_store::{ObjectStore, ObjectStoreApi};
|
||||
use object_store::{path::parsed::DirsAndFileName, ObjectStore, ObjectStoreApi};
|
||||
use observability_deps::tracing::info;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
|
@ -41,7 +41,18 @@ where
|
|||
let mut files_removed = 0;
|
||||
while let Some(paths) = stream.try_next().await.context(ReadError)? {
|
||||
for path in paths {
|
||||
if !all_known.contains(&path.clone().into()) {
|
||||
let path_parsed: DirsAndFileName = path.clone().into();
|
||||
|
||||
// only delete if all of the following conditions are met:
|
||||
// - filename ends with `.parquet`
|
||||
// - file is not tracked by the catalog
|
||||
if path_parsed
|
||||
.file_name
|
||||
.as_ref()
|
||||
.map(|part| part.encoded().ends_with(".parquet"))
|
||||
.unwrap_or(false)
|
||||
&& !all_known.contains(&path_parsed)
|
||||
{
|
||||
store.delete(&path).await.context(WriteError)?;
|
||||
files_removed += 1;
|
||||
}
|
||||
|
@ -58,8 +69,9 @@ where
|
|||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroU32, sync::Arc};
|
||||
|
||||
use bytes::Bytes;
|
||||
use data_types::server_id::ServerId;
|
||||
use object_store::path::ObjectStorePath;
|
||||
use object_store::path::{parsed::DirsAndFileName, ObjectStorePath, Path};
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
|
@ -83,21 +95,31 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// create some data
|
||||
let mut paths_tracked = vec![];
|
||||
let mut paths_untracked = vec![];
|
||||
let mut paths_keep = vec![];
|
||||
let mut paths_delete = vec![];
|
||||
{
|
||||
let mut transaction = catalog.open_transaction().await;
|
||||
|
||||
// an ordinary tracked parquet file => keep
|
||||
let (path, md) = make_metadata(&object_store, "foo", 1).await;
|
||||
transaction.add_parquet(&path.clone().into(), &md).unwrap();
|
||||
paths_tracked.push(path.display());
|
||||
paths_keep.push(path.display());
|
||||
|
||||
// another ordinary tracked parquet file => keep
|
||||
let (path, md) = make_metadata(&object_store, "foo", 2).await;
|
||||
transaction.add_parquet(&path.clone().into(), &md).unwrap();
|
||||
paths_tracked.push(path.display());
|
||||
paths_keep.push(path.display());
|
||||
|
||||
// not a parquet file => keep
|
||||
let mut path: DirsAndFileName = path.into();
|
||||
path.file_name = Some("foo.txt".into());
|
||||
let path = object_store.path_from_dirs_and_filename(path);
|
||||
create_empty_file(&object_store, &path).await;
|
||||
paths_keep.push(path.display());
|
||||
|
||||
// an untracked parquet file => delete
|
||||
let (path, _md) = make_metadata(&object_store, "foo", 3).await;
|
||||
paths_untracked.push(path.display());
|
||||
paths_delete.push(path.display());
|
||||
|
||||
transaction.commit().await.unwrap();
|
||||
}
|
||||
|
@ -116,10 +138,10 @@ mod tests {
|
|||
.iter()
|
||||
.map(|p| p.display())
|
||||
.collect();
|
||||
for p in paths_tracked {
|
||||
for p in paths_keep {
|
||||
assert!(dbg!(&all_files).contains(dbg!(&p)));
|
||||
}
|
||||
for p in paths_untracked {
|
||||
for p in paths_delete {
|
||||
assert!(!dbg!(&all_files).contains(dbg!(&p)));
|
||||
}
|
||||
}
|
||||
|
@ -127,4 +149,18 @@ mod tests {
|
|||
fn make_server_id() -> ServerId {
|
||||
ServerId::new(NonZeroU32::new(1).unwrap())
|
||||
}
|
||||
|
||||
async fn create_empty_file(object_store: &ObjectStore, path: &Path) {
|
||||
let data = Bytes::default();
|
||||
let len = data.len();
|
||||
|
||||
object_store
|
||||
.put(
|
||||
&path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue