feat: Implement list_with_delimiter for File object store

Fixes #688.
pull/24376/head
Carol (Nichols || Goulding) 2021-02-04 10:53:48 -05:00
parent 5b18f7dbea
commit f9454fb57f
2 changed files with 102 additions and 10 deletions

View File

@ -1,9 +1,10 @@
//! This module contains the IOx implementation for using local disk as the
//! object store.
use crate::{
path::file::FilePath, DataDoesNotMatchLength, ListResult, ObjectStoreApi, Result,
UnableToCopyDataToFile, UnableToCreateDir, UnableToCreateFile, UnableToDeleteFile,
UnableToOpenFile, UnableToReadBytes, UnableToStreamDataIntoMemory,
path::file::FilePath, DataDoesNotMatchLength, FileSizeOverflowedUsize, ListResult, ObjectMeta,
ObjectStoreApi, Result, UnableToAccessMetadata, UnableToCopyDataToFile, UnableToCreateDir,
UnableToCreateFile, UnableToDeleteFile, UnableToOpenFile, UnableToProcessEntry,
UnableToReadBytes, UnableToStreamDataIntoMemory,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -12,7 +13,7 @@ use futures::{
Stream, StreamExt, TryStreamExt,
};
use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt};
use std::{io, path::PathBuf};
use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf};
use tokio::fs;
use tokio_util::codec::{BytesCodec, FramedRead};
use walkdir::WalkDir;
@ -124,8 +125,80 @@ impl ObjectStoreApi for File {
Ok(stream::iter(s).boxed())
}
async fn list_with_delimiter(&self, _prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
unimplemented!()
async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
// Always treat prefix as relative because the list operations don't know
// anything about where on disk the root of this object store is; they
// only care about what's within this object store's directory. See
// documentation for `push_path`: it deliberately does *not* behave as
// `PathBuf::push` does: there is no way to replace the root. So even if
// `prefix` isn't relative, we treat it as such here.
let mut resolved_prefix = self.root.clone();
resolved_prefix.push_path(prefix);
// It is valid to specify a prefix with directories `[foo, bar]` and filename
// `baz`, in which case we want to treat it like a glob for
// `foo/bar/baz*` and there may not actually be a file or directory
// named `foo/bar/baz`. We want to look at all the entries in
// `foo/bar/`, so remove the file name.
let mut search_path = resolved_prefix.clone();
search_path.unset_file_name();
let walkdir = WalkDir::new(&search_path.to_raw())
.min_depth(1)
.max_depth(1);
let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
let root_path = self.root.to_raw();
for entry in walkdir {
let entry = entry
.map_err(|e| std::io::Error::from(e))
.context(UnableToProcessEntry)?;
let entry_location = FilePath::raw(entry.path());
if entry_location.prefix_matches(&resolved_prefix) {
let metadata = entry
.metadata()
.map_err(|e| std::io::Error::from(e))
.context(UnableToAccessMetadata { path: entry.path() })?;
if metadata.is_dir() {
let parts = entry_location
.parts_after_prefix(&resolved_prefix)
.expect("must have prefix because of the if prefix_matches condition");
let mut relative_location = prefix.to_owned();
relative_location.push_part_as_dir(&parts[0]);
common_prefixes.insert(relative_location);
} else {
let path = entry
.path()
.strip_prefix(&root_path)
.expect("must have prefix because of the if prefix_matches condition");
let location = FilePath::raw(path);
let last_modified = metadata
.modified()
.expect("Modified file time should be supported on this platform")
.into();
let size = usize::try_from(metadata.len())
.context(FileSizeOverflowedUsize { path: entry.path() })?;
objects.push(ObjectMeta {
location,
last_modified,
size,
});
}
}
}
Ok(ListResult {
next_token: None,
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
}
}
@ -151,10 +224,12 @@ mod tests {
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = TestError> = std::result::Result<T, E>;
use tempfile::TempDir;
use crate::{tests::put_get_delete_list, Error, ObjectStoreApi, ObjectStorePath};
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
Error, ObjectStoreApi, ObjectStorePath,
};
use futures::stream;
use tempfile::TempDir;
#[tokio::test]
async fn file_test() -> Result<()> {
@ -162,6 +237,8 @@ mod tests {
let integration = File::new(root.path());
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;
Ok(())
}

View File

@ -268,7 +268,11 @@ impl ObjectStoreApi for ObjectStore {
.map_ok(|list_result| list_result.map_paths(path::Path::InMemory))
.await
}
(File(_file), _) => unimplemented!(),
(File(file), path::Path::File(prefix)) => {
file.list_with_delimiter(prefix)
.map_ok(|list_result| list_result.map_paths(path::Path::File))
.await
}
(MicrosoftAzure(_azure), _) => unimplemented!(),
_ => unreachable!(),
}
@ -494,6 +498,17 @@ pub enum Error {
UnableToCopyDataToFile {
source: io::Error,
},
#[snafu(display("Unable to access metadata for {}: {}", path.display(), source))]
UnableToAccessMetadata {
source: io::Error,
path: PathBuf,
},
#[snafu(display("File size for {} did not fit in a usize: {}", path.display(), source))]
FileSizeOverflowedUsize {
source: std::num::TryFromIntError,
path: PathBuf,
},
}
#[cfg(test)]