fix: Make file behave the same as other object stores with paths

pull/24376/head
Carol (Nichols || Goulding) 2021-01-14 16:05:10 -05:00
parent 0415d4a186
commit 813092649d
4 changed files with 25 additions and 22 deletions

1
Cargo.lock generated
View File

@ -2214,6 +2214,7 @@ dependencies = [
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-util", "tokio-util",
"walkdir",
] ]
[[package]] [[package]]

View File

@ -23,6 +23,7 @@ tokio = { version = "0.2", features = ["full"] }
# Filesystem integration # Filesystem integration
tokio-util = "0.3.1" tokio-util = "0.3.1"
walkdir = "2"
# Microsoft Azure Blob storage integration # Microsoft Azure Blob storage integration
azure_sdk_core = "0.43.7" azure_sdk_core = "0.43.7"

View File

@ -3,15 +3,15 @@
use crate::{ use crate::{
path::{FileConverter, ObjectStorePath}, path::{FileConverter, ObjectStorePath},
DataDoesNotMatchLength, Result, UnableToCopyDataToFile, UnableToCreateDir, UnableToCreateFile, DataDoesNotMatchLength, Result, UnableToCopyDataToFile, UnableToCreateDir, UnableToCreateFile,
UnableToDeleteFile, UnableToListDirectory, UnableToOpenFile, UnableToProcessEntry, UnableToDeleteFile, UnableToOpenFile, UnableToPutDataInMemory, UnableToReadBytes,
UnableToPutDataInMemory, UnableToReadBytes,
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, TryStreamExt}; use futures::{stream, Stream, TryStreamExt};
use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt}; use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt};
use std::{io, path::PathBuf}; use std::{io, path::PathBuf};
use tokio::fs; use tokio::fs;
use tokio_util::codec::{BytesCodec, FramedRead}; use tokio_util::codec::{BytesCodec, FramedRead};
use walkdir::WalkDir;
/// Local filesystem storage suitable for testing or for opting out of using a /// Local filesystem storage suitable for testing or for opting out of using a
/// cloud storage provider. /// cloud storage provider.
@ -111,24 +111,26 @@ impl File {
&'a self, &'a self,
prefix: Option<&'a ObjectStorePath>, prefix: Option<&'a ObjectStorePath>,
) -> Result<impl Stream<Item = Result<Vec<ObjectStorePath>>> + 'a> { ) -> Result<impl Stream<Item = Result<Vec<ObjectStorePath>>> + 'a> {
let dirs = fs::read_dir(FileConverter::convert(&self.root)) let root_path = FileConverter::convert(&self.root);
.await let walkdir = WalkDir::new(&root_path)
.context(UnableToListDirectory { // Don't include the root directory itself
path: format!("{:?}", self.root), .min_depth(1);
})?;
let s = dirs let s = walkdir.into_iter().filter_map(move |result_dir_entry| {
.context(UnableToProcessEntry) result_dir_entry
.and_then(|entry| { .ok()
let file_path_buf: PathBuf = entry.file_name().into(); .filter(|dir_entry| dir_entry.file_type().is_file())
async move { Ok(ObjectStorePath::from_path_buf_unchecked(file_path_buf)) } .map(|file| {
let relative_path = file.path().strip_prefix(&root_path).expect(
"Must start with root path because this came from walking the root",
);
ObjectStorePath::from_path_buf_unchecked(relative_path)
}) })
.try_filter(move |name| { .filter(|name| prefix.map_or(true, |p| name.prefix_matches(p)))
let matches = prefix.map_or(true, |p| name.prefix_matches(p)); .map(|name| Ok(vec![name]))
async move { matches } });
})
.map_ok(|name| vec![name]); Ok(stream::iter(s))
Ok(s)
} }
} }
@ -145,7 +147,6 @@ mod tests {
use futures::stream; use futures::stream;
#[tokio::test] #[tokio::test]
#[ignore]
async fn file_test() -> Result<()> { async fn file_test() -> Result<()> {
let root = TempDir::new()?; let root = TempDir::new()?;
let integration = ObjectStore::new_file(File::new(root.path())); let integration = ObjectStore::new_file(File::new(root.path()));

View File

@ -425,7 +425,7 @@ impl FileConverter {
.map(|p| &p.0) .map(|p| &p.0)
.collect(); .collect();
if let Some(file_name) = &dirs_and_file_name.file_name { if let Some(file_name) = &dirs_and_file_name.file_name {
path.set_file_name(&file_name.0); path.push(&file_name.0);
} }
path path
} }