From 813092649d754c407e1418c7e0c85ddc8e65b3a4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 14 Jan 2021 16:05:10 -0500 Subject: [PATCH] fix: Make file behave the same as other object stores with paths --- Cargo.lock | 1 + object_store/Cargo.toml | 1 + object_store/src/disk.rs | 43 ++++++++++++++++++++-------------------- object_store/src/path.rs | 2 +- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb7d562514..ba68e30ef2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2214,6 +2214,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util", + "walkdir", ] [[package]] diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 9ad96eb6fb..993de28229 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -23,6 +23,7 @@ tokio = { version = "0.2", features = ["full"] } # Filesystem integration tokio-util = "0.3.1" +walkdir = "2" # Microsoft Azure Blob storage integration azure_sdk_core = "0.43.7" diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index 32871728fc..46fdad862c 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -3,15 +3,15 @@ use crate::{ path::{FileConverter, ObjectStorePath}, DataDoesNotMatchLength, Result, UnableToCopyDataToFile, UnableToCreateDir, UnableToCreateFile, - UnableToDeleteFile, UnableToListDirectory, UnableToOpenFile, UnableToProcessEntry, - UnableToPutDataInMemory, UnableToReadBytes, + UnableToDeleteFile, UnableToOpenFile, UnableToPutDataInMemory, UnableToReadBytes, }; use bytes::Bytes; -use futures::{Stream, TryStreamExt}; +use futures::{stream, Stream, TryStreamExt}; use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt}; use std::{io, path::PathBuf}; use tokio::fs; use tokio_util::codec::{BytesCodec, FramedRead}; +use walkdir::WalkDir; /// Local filesystem storage suitable for testing or for opting out of using a /// cloud storage provider. @@ -111,24 +111,26 @@ impl File { &'a self, prefix: Option<&'a ObjectStorePath>, ) -> Result>> + 'a> { - let dirs = fs::read_dir(FileConverter::convert(&self.root)) - .await - .context(UnableToListDirectory { - path: format!("{:?}", self.root), - })?; + let root_path = FileConverter::convert(&self.root); + let walkdir = WalkDir::new(&root_path) + // Don't include the root directory itself + .min_depth(1); - let s = dirs - .context(UnableToProcessEntry) - .and_then(|entry| { - let file_path_buf: PathBuf = entry.file_name().into(); - async move { Ok(ObjectStorePath::from_path_buf_unchecked(file_path_buf)) } - }) - .try_filter(move |name| { - let matches = prefix.map_or(true, |p| name.prefix_matches(p)); - async move { matches } - }) - .map_ok(|name| vec![name]); - Ok(s) + let s = walkdir.into_iter().filter_map(move |result_dir_entry| { + result_dir_entry + .ok() + .filter(|dir_entry| dir_entry.file_type().is_file()) + .map(|file| { + let relative_path = file.path().strip_prefix(&root_path).expect( + "Must start with root path because this came from walking the root", + ); + ObjectStorePath::from_path_buf_unchecked(relative_path) + }) + .filter(|name| prefix.map_or(true, |p| name.prefix_matches(p))) + .map(|name| Ok(vec![name])) + }); + + Ok(stream::iter(s)) } } @@ -145,7 +147,6 @@ mod tests { use futures::stream; #[tokio::test] - #[ignore] async fn file_test() -> Result<()> { let root = TempDir::new()?; let integration = ObjectStore::new_file(File::new(root.path())); diff --git a/object_store/src/path.rs b/object_store/src/path.rs index 9078d63742..bbae268b3d 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -425,7 +425,7 @@ impl FileConverter { .map(|p| &p.0) .collect(); if let Some(file_name) = &dirs_and_file_name.file_name { - path.set_file_name(&file_name.0); + path.push(&file_name.0); } path }