fix: do not silence all IO errors in disk-based LIST
parent
a98b10745f
commit
24b249ad7b
|
@ -29,6 +29,9 @@ pub enum Error {
|
|||
path: PathBuf,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to walk dir: {}", source))]
|
||||
UnableToWalkDir { source: walkdir::Error },
|
||||
|
||||
#[snafu(display("Unable to access metadata for {}: {}", path.display(), source))]
|
||||
UnableToAccessMetadata {
|
||||
source: walkdir::Error,
|
||||
|
@ -150,19 +153,23 @@ impl ObjectStoreApi for File {
|
|||
// Don't include the root directory itself
|
||||
.min_depth(1);
|
||||
|
||||
let s = walkdir.into_iter().filter_map(move |result_dir_entry| {
|
||||
result_dir_entry
|
||||
.ok()
|
||||
.filter(|dir_entry| dir_entry.file_type().is_file())
|
||||
.map(|file| {
|
||||
let relative_path = file.path().strip_prefix(&root_path).expect(
|
||||
"Must start with root path because this came from walking the root",
|
||||
);
|
||||
FilePath::raw(relative_path, false)
|
||||
})
|
||||
.filter(|name| prefix.map_or(true, |p| name.prefix_matches(p)))
|
||||
.map(|name| Ok(vec![name]))
|
||||
});
|
||||
let s =
|
||||
walkdir.into_iter().filter_map(move |result_dir_entry| {
|
||||
match convert_walkdir_result(result_dir_entry) {
|
||||
Err(e) => Some(Err(e)),
|
||||
Ok(None) => None,
|
||||
Ok(entry @ Some(_)) => entry
|
||||
.filter(|dir_entry| dir_entry.file_type().is_file())
|
||||
.map(|file| {
|
||||
let relative_path = file.path().strip_prefix(&root_path).expect(
|
||||
"Must start with root path because this came from walking the root",
|
||||
);
|
||||
FilePath::raw(relative_path, false)
|
||||
})
|
||||
.filter(|name| prefix.map_or(true, |p| name.prefix_matches(p)))
|
||||
.map(|name| Ok(vec![name])),
|
||||
}
|
||||
});
|
||||
|
||||
Ok(stream::iter(s).boxed())
|
||||
}
|
||||
|
@ -193,41 +200,43 @@ impl ObjectStoreApi for File {
|
|||
let mut objects = Vec::new();
|
||||
|
||||
let root_path = self.root.to_raw();
|
||||
for entry in walkdir.into_iter().filter_map(Result::ok) {
|
||||
let entry_location = FilePath::raw(entry.path(), false);
|
||||
for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
|
||||
if let Some(entry) = entry_res? {
|
||||
let entry_location = FilePath::raw(entry.path(), false);
|
||||
|
||||
if entry_location.prefix_matches(&resolved_prefix) {
|
||||
let metadata = entry
|
||||
.metadata()
|
||||
.context(UnableToAccessMetadata { path: entry.path() })?;
|
||||
if entry_location.prefix_matches(&resolved_prefix) {
|
||||
let metadata = entry
|
||||
.metadata()
|
||||
.context(UnableToAccessMetadata { path: entry.path() })?;
|
||||
|
||||
if metadata.is_dir() {
|
||||
let parts = entry_location
|
||||
.parts_after_prefix(&resolved_prefix)
|
||||
.expect("must have prefix because of the if prefix_matches condition");
|
||||
if metadata.is_dir() {
|
||||
let parts = entry_location
|
||||
.parts_after_prefix(&resolved_prefix)
|
||||
.expect("must have prefix because of the if prefix_matches condition");
|
||||
|
||||
let mut relative_location = prefix.to_owned();
|
||||
relative_location.push_part_as_dir(&parts[0]);
|
||||
common_prefixes.insert(relative_location);
|
||||
} else {
|
||||
let path = entry
|
||||
.path()
|
||||
.strip_prefix(&root_path)
|
||||
.expect("must have prefix because of the if prefix_matches condition");
|
||||
let location = FilePath::raw(path, false);
|
||||
let mut relative_location = prefix.to_owned();
|
||||
relative_location.push_part_as_dir(&parts[0]);
|
||||
common_prefixes.insert(relative_location);
|
||||
} else {
|
||||
let path = entry
|
||||
.path()
|
||||
.strip_prefix(&root_path)
|
||||
.expect("must have prefix because of the if prefix_matches condition");
|
||||
let location = FilePath::raw(path, false);
|
||||
|
||||
let last_modified = metadata
|
||||
.modified()
|
||||
.expect("Modified file time should be supported on this platform")
|
||||
.into();
|
||||
let size = usize::try_from(metadata.len())
|
||||
.context(FileSizeOverflowedUsize { path: entry.path() })?;
|
||||
let last_modified = metadata
|
||||
.modified()
|
||||
.expect("Modified file time should be supported on this platform")
|
||||
.into();
|
||||
let size = usize::try_from(metadata.len())
|
||||
.context(FileSizeOverflowedUsize { path: entry.path() })?;
|
||||
|
||||
objects.push(ObjectMeta {
|
||||
location,
|
||||
last_modified,
|
||||
size,
|
||||
});
|
||||
objects.push(ObjectMeta {
|
||||
location,
|
||||
last_modified,
|
||||
size,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -240,6 +249,26 @@ impl ObjectStoreApi for File {
|
|||
}
|
||||
}
|
||||
|
||||
/// Convert walkdir results and converts not-found errors into `None`.
|
||||
fn convert_walkdir_result(
|
||||
res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
|
||||
) -> Result<Option<walkdir::DirEntry>> {
|
||||
match res {
|
||||
Ok(entry) => Ok(Some(entry)),
|
||||
Err(walkdir_err) => match walkdir_err.io_error() {
|
||||
Some(io_err) => match io_err.kind() {
|
||||
io::ErrorKind::NotFound => Ok(None),
|
||||
_ => Err(Error::UnableToWalkDir {
|
||||
source: walkdir_err,
|
||||
}),
|
||||
},
|
||||
None => Err(Error::UnableToWalkDir {
|
||||
source: walkdir_err,
|
||||
}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
impl File {
|
||||
/// Create new filesystem storage.
|
||||
pub fn new(root: impl Into<PathBuf>) -> Self {
|
||||
|
@ -258,6 +287,8 @@ impl File {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::{
|
||||
|
@ -356,4 +387,36 @@ mod tests {
|
|||
.unwrap();
|
||||
assert_eq!(&*read_data, data);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bubble_up_io_errors() {
|
||||
let root = TempDir::new().unwrap();
|
||||
|
||||
// make non-readable
|
||||
let metadata = root.path().metadata().unwrap();
|
||||
let mut permissions = metadata.permissions();
|
||||
permissions.set_mode(0o000);
|
||||
set_permissions(root.path(), permissions).unwrap();
|
||||
|
||||
let store = File::new(root.path());
|
||||
|
||||
// `list` must fail
|
||||
match store.list(None).await {
|
||||
Err(_) => {
|
||||
// ok, error found
|
||||
}
|
||||
Ok(mut stream) => {
|
||||
let mut any_err = false;
|
||||
while let Some(res) = stream.next().await {
|
||||
if res.is_err() {
|
||||
any_err = true;
|
||||
}
|
||||
}
|
||||
assert!(any_err);
|
||||
}
|
||||
}
|
||||
|
||||
// `list_with_delimiter
|
||||
assert!(store.list_with_delimiter(&store.new_path()).await.is_err());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue