From 06f1358e2db069b61572351ca35aba23c728e949 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 8 Jan 2021 10:31:06 -0500 Subject: [PATCH 1/7] feat: Change ObjectStorePath API to be more explicit Now you have to designate whether you're adding a directory or a file name, with some assumptions based on paths coming from a cloud object storage or the file system. A notable difference: checking to see if "apple/b" is a prefix of "apple/bear/cow.json" will now say no; only whole directories are matched. --- object_store/src/disk.rs | 4 +- object_store/src/lib.rs | 17 +- object_store/src/memory.rs | 43 ++-- object_store/src/path.rs | 489 ++++++++++++++++++++++++++++++------- server/src/lib.rs | 6 +- server/src/snapshot.rs | 14 +- src/server/http_routes.rs | 6 +- 7 files changed, 449 insertions(+), 130 deletions(-) diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index a9aacb5eca..32871728fc 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -124,7 +124,7 @@ impl File { async move { Ok(ObjectStorePath::from_path_buf_unchecked(file_path_buf)) } }) .try_filter(move |name| { - let matches = prefix.map_or(true, |p| name.starts_with(p)); + let matches = prefix.map_or(true, |p| name.prefix_matches(p)); async move { matches } }) .map_ok(|name| vec![name]); @@ -181,7 +181,7 @@ mod tests { let data = Bytes::from("arbitrary data"); let mut location = ObjectStorePath::default(); - location.push_all(&["nested", "file", "test_file"]); + location.push_all_dirs(&["nested", "file", "test_file"]); let stream_data = std::io::Result::Ok(data.clone()); storage diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index d29d81f53b..9ceddae62c 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -377,7 +377,8 @@ mod tests { let data = Bytes::from("arbitrary data"); let mut location = ObjectStorePath::default(); - location.push("test_file"); + location.push_dir("test_dir"); + location.set_file_name("test_file.json"); let stream_data = std::io::Result::Ok(data.clone()); storage @@ -394,13 +395,13 @@ mod tests { // List everything starting with a prefix that should return results let mut prefix = ObjectStorePath::default(); - prefix.push("test"); + prefix.push_dir("test_dir"); let content_list = flatten_list_stream(storage, Some(&prefix)).await?; assert_eq!(content_list, &[location.clone()]); // List everything starting with a prefix that shouldn't return results let mut prefix = ObjectStorePath::default(); - prefix.push("something"); + prefix.push_dir("something"); let content_list = flatten_list_stream(storage, Some(&prefix)).await?; assert!(content_list.is_empty()); @@ -454,17 +455,15 @@ mod tests { } let mut prefix = ObjectStorePath::default(); - prefix.push_all(&["mydb", "wal"]); + prefix.push_all_dirs(&["mydb", "wal"]); let mut expected_000 = prefix.clone(); - expected_000.push("000"); + expected_000.push_dir("000"); let mut expected_001 = prefix.clone(); - expected_001.push("001"); + expected_001.push_dir("001"); let mut expected_location = prefix.clone(); - expected_location.push("foo.test"); + expected_location.set_file_name("foo.test"); - // This is needed because we want a trailing slash on the prefix in this test - prefix.push(""); let result = storage.list_with_delimiter(&prefix).await.unwrap(); assert_eq!(result.common_prefixes, vec![expected_000, expected_001]); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 121b32288c..2b35d67d1c 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -1,7 +1,8 @@ //! This module contains the IOx implementation for using memory as the object //! store. use crate::{ - path::ObjectStorePath, DataDoesNotMatchLength, ListResult, NoDataInMemory, ObjectMeta, Result, + path::{DirsAndFileName, ObjectStorePath}, + DataDoesNotMatchLength, ListResult, NoDataInMemory, ObjectMeta, Result, UnableToPutDataInMemory, }; use bytes::Bytes; @@ -16,7 +17,7 @@ use tokio::sync::RwLock; /// storage provider. #[derive(Debug, Default)] pub struct InMemory { - storage: RwLock>, + storage: RwLock>, } impl InMemory { @@ -56,7 +57,7 @@ impl InMemory { let content = content.freeze(); - self.storage.write().await.insert(location.clone(), content); + self.storage.write().await.insert(location.into(), content); Ok(()) } @@ -65,11 +66,12 @@ impl InMemory { &self, location: &ObjectStorePath, ) -> Result>> { + let location = location.into(); let data = self .storage .read() .await - .get(location) + .get(&location) .cloned() .context(NoDataInMemory)?; @@ -78,7 +80,7 @@ impl InMemory { /// Delete the object at the specified location. pub async fn delete(&self, location: &ObjectStorePath) -> Result<()> { - self.storage.write().await.remove(location); + self.storage.write().await.remove(&location.into()); Ok(()) } @@ -87,16 +89,18 @@ impl InMemory { &'a self, prefix: Option<&'a ObjectStorePath>, ) -> Result>> + 'a> { - let list = if let Some(prefix) = prefix { + let prefix = prefix.map(Into::into); + + let list = if let Some(prefix) = &prefix { self.storage .read() .await .keys() - .filter(|k| k.starts_with(prefix)) - .cloned() + .filter(|k| k.prefix_matches(prefix)) + .map(Into::into) .collect() } else { - self.storage.read().await.keys().cloned().collect() + self.storage.read().await.keys().map(Into::into).collect() }; Ok(futures::stream::once(async move { Ok(list) })) @@ -115,25 +119,27 @@ impl InMemory { let mut common_prefixes = BTreeSet::new(); let last_modified = Utc::now(); - // set the end prefix so we pull back everything that starts with - // the passed in prefix - let mut end_prefix = prefix.clone(); - end_prefix.pop(); - end_prefix.push("0"); + let prefix: DirsAndFileName = prefix.into(); // Only objects in this base level should be returned in the // response. Otherwise, we just collect the common prefixes. let mut objects = vec![]; - for (k, v) in self.storage.read().await.range(prefix.clone()..end_prefix) { + for (k, v) in self + .storage + .read() + .await + .range((&prefix)..) + .take_while(|(k, _)| k.prefix_matches(&prefix)) + { let parts = k.parts_after_prefix(&prefix); if parts.len() >= 2 { let mut full_prefix = prefix.clone(); - full_prefix.push_part(&parts[0]); + full_prefix.push_part_as_dir(&parts[0]); common_prefixes.insert(full_prefix); } else { let object = ObjectMeta { - location: k.clone(), + location: k.into(), last_modified, size: v.len(), }; @@ -143,7 +149,7 @@ impl InMemory { Ok(ListResult { objects, - common_prefixes: common_prefixes.into_iter().collect(), + common_prefixes: common_prefixes.into_iter().map(Into::into).collect(), next_token: None, }) } @@ -163,7 +169,6 @@ mod tests { use futures::stream; #[tokio::test] - #[ignore] async fn in_memory_test() -> Result<()> { let integration = ObjectStore::new_in_memory(InMemory::new()); diff --git a/object_store/src/path.rs b/object_store/src/path.rs index 72b6afa576..86429fe658 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -3,7 +3,7 @@ use itertools::Itertools; use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; -use std::path::PathBuf; +use std::{mem, path::PathBuf}; /// Universal interface for handling paths and locations for objects and /// directories in the object store. @@ -13,69 +13,62 @@ use std::path::PathBuf; /// /// Deliberately does not implement `Display` or `ToString`! Use one of the /// converters. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] +#[derive(Default, Clone, PartialEq, Eq, Debug)] pub struct ObjectStorePath { - parts: Vec, + inner: PathRepresentation, } impl ObjectStorePath { /// For use when receiving a path from an object store API directly, not /// when building a path. Assumes DELIMITER is the separator. /// - /// TODO: Improve performance by implementing a CoW-type model to delay - /// parsing until needed TODO: This should only be available to cloud - /// storage + /// TODO: This should only be available to cloud storage pub fn from_cloud_unchecked(path: impl Into) -> Self { let path = path.into(); Self { - parts: path - .split_terminator(DELIMITER) - .map(|s| PathPart(s.to_string())) - .collect(), + inner: PathRepresentation::RawCloud(path), } } /// For use when receiving a path from a filesystem directly, not /// when building a path. Uses the standard library's path splitting /// implementation to separate into parts. + /// + /// TODO: This should only be available to file storage pub fn from_path_buf_unchecked(path: impl Into) -> Self { let path = path.into(); Self { - parts: path - .iter() - .flat_map(|s| s.to_os_string().into_string().map(PathPart)) - .collect(), + inner: PathRepresentation::RawPathBuf(path), } } /// Add a part to the end of the path, encoding any restricted characters. - pub fn push(&mut self, part: impl Into) { - let part = part.into(); - self.parts.push((&*part).into()); + pub fn push_dir(&mut self, part: impl Into) { + self.inner = mem::take(&mut self.inner).push_dir(part); } - /// Add a `PathPart` to the end of the path. Infallible because the - /// `PathPart` should already have been checked for restricted - /// characters. - pub fn push_part(&mut self, part: &PathPart) { - self.parts.push(part.to_owned()); + /// Add a `PathPart` to the end of the path. + pub fn push_part_as_dir(&mut self, part: &PathPart) { + self.inner = mem::take(&mut self.inner).push_part_as_dir(part); + } + + /// Set the file name of this path + pub fn set_file_name(&mut self, part: impl Into) { + self.inner = mem::take(&mut self.inner).set_file_name(part); } /// Add the parts of `ObjectStorePath` to the end of the path. Notably does - /// *not* behave as `PathBuf::push` does: no existing part of `self` - /// will be replaced as part of this call. + /// *not* behave as `PathBuf::push` does: there is no way to replace the + /// root. If `self` has a file name, that will be removed, then the + /// directories of `path` will be appended, then any file name of `path` + /// will be assigned to `self`. pub fn push_path(&mut self, path: &Self) { - self.parts.extend_from_slice(&path.parts); + self.inner = mem::take(&mut self.inner).push_path(path) } - /// Push a bunch of parts in one go. - pub fn push_all<'a>(&mut self, parts: impl AsRef<[&'a str]>) { - self.parts.extend(parts.as_ref().iter().map(|&v| v.into())); - } - - /// Return the component parts of the path. - pub fn as_parts(&self) -> &[PathPart] { - self.parts.as_ref() + /// Push a bunch of parts as directories in one go. + pub fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) { + self.inner = mem::take(&mut self.inner).push_all_dirs(parts); } /// Pops a part from the path and returns it, or `None` if it's empty. @@ -83,25 +76,26 @@ impl ObjectStorePath { unimplemented!() } - /// Determines whether `prefix` is a prefix of `self`. - pub fn starts_with(&self, prefix: &Self) -> bool { - let diff = itertools::diff_with(self.parts.iter(), prefix.parts.iter(), |a, b| a == b); - match diff { - None => true, - Some(itertools::Diff::Shorter(..)) => true, - Some(itertools::Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => { - let first_prefix = remaining_prefix.next().expect("must be at least one value"); - - // there must not be any other remaining parts in the prefix - remaining_prefix.next().is_none() - // and the next item in self must start with the last item in the prefix - && remaining_self - .next() - .expect("must be at least one value") - .0 - .starts_with(&first_prefix.0) + /// Returns true if the directories in `prefix` are the same as the starting + /// directories of `self`. Does not use filenames; does not match + /// partial directory names. + pub fn prefix_matches(&self, prefix: &Self) -> bool { + use PathRepresentation::*; + match (&self.inner, &prefix.inner) { + (Parts(self_parts), Parts(other_parts)) => self_parts.prefix_matches(&other_parts), + (Parts(self_parts), _) => { + let prefix_parts: DirsAndFileName = prefix.into(); + self_parts.prefix_matches(&prefix_parts) + } + (_, Parts(prefix_parts)) => { + let self_parts: DirsAndFileName = self.into(); + self_parts.prefix_matches(&prefix_parts) + } + _ => { + let self_parts: DirsAndFileName = self.into(); + let prefix_parts: DirsAndFileName = prefix.into(); + self_parts.prefix_matches(&prefix_parts) } - _ => false, } } @@ -111,6 +105,202 @@ impl ObjectStorePath { } } +impl From<&'_ DirsAndFileName> for ObjectStorePath { + fn from(other: &'_ DirsAndFileName) -> Self { + other.clone().into() + } +} + +impl From for ObjectStorePath { + fn from(other: DirsAndFileName) -> Self { + Self { + inner: PathRepresentation::Parts(other), + } + } +} + +#[derive(Clone, Eq, Debug)] +enum PathRepresentation { + RawCloud(String), + RawPathBuf(PathBuf), + Parts(DirsAndFileName), +} + +impl Default for PathRepresentation { + fn default() -> Self { + Self::Parts(DirsAndFileName::default()) + } +} + +impl PathRepresentation { + /// Add a part to the end of the path's directories, encoding any restricted + /// characters. + fn push_dir(self, part: impl Into) -> Self { + let mut dirs_and_file_name: DirsAndFileName = self.into(); + + dirs_and_file_name.push_dir(part); + Self::Parts(dirs_and_file_name) + } + + /// Push a bunch of parts as directories in one go. + fn push_all_dirs<'a>(self, parts: impl AsRef<[&'a str]>) -> Self { + let mut dirs_and_file_name: DirsAndFileName = self.into(); + + dirs_and_file_name + .directories + .extend(parts.as_ref().iter().map(|&v| v.into())); + + Self::Parts(dirs_and_file_name) + } + + /// Add a `PathPart` to the end of the path's directories. + fn push_part_as_dir(self, part: &PathPart) -> Self { + let mut dirs_and_file_name: DirsAndFileName = self.into(); + + dirs_and_file_name.push_part_as_dir(part); + Self::Parts(dirs_and_file_name) + } + + /// Add the parts of `ObjectStorePath` to the end of the path. Notably does + /// *not* behave as `PathBuf::push` does: there is no way to replace the + /// root. If `self` has a file name, that will be removed, then the + /// directories of `path` will be appended, then any file name of `path` + /// will be assigned to `self`. + fn push_path(self, path: &ObjectStorePath) -> Self { + let DirsAndFileName { + directories: path_dirs, + file_name: path_file_name, + } = path.inner.to_owned().into(); + let mut dirs_and_file_name: DirsAndFileName = self.into(); + + dirs_and_file_name.directories.extend(path_dirs); + dirs_and_file_name.file_name = path_file_name; + + Self::Parts(dirs_and_file_name) + } + + /// Set the file name of this path + fn set_file_name(self, part: impl Into) -> Self { + let part = part.into(); + let mut dirs_and_file_name: DirsAndFileName = self.into(); + + dirs_and_file_name.file_name = Some((&*part).into()); + Self::Parts(dirs_and_file_name) + } +} + +impl PartialEq for PathRepresentation { + fn eq(&self, other: &Self) -> bool { + use PathRepresentation::*; + match (self, other) { + (Parts(self_parts), Parts(other_parts)) => self_parts == other_parts, + (Parts(self_parts), _) => { + let other_parts: DirsAndFileName = other.to_owned().into(); + *self_parts == other_parts + } + (_, Parts(other_parts)) => { + let self_parts: DirsAndFileName = self.to_owned().into(); + self_parts == *other_parts + } + _ => { + let self_parts: DirsAndFileName = self.to_owned().into(); + let other_parts: DirsAndFileName = other.to_owned().into(); + self_parts == other_parts + } + } + } +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Default)] +pub(crate) struct DirsAndFileName { + directories: Vec, + file_name: Option, +} + +impl DirsAndFileName { + pub(crate) fn prefix_matches(&self, prefix: &Self) -> bool { + let diff = itertools::diff_with( + self.directories.iter(), + prefix.directories.iter(), + |a, b| a == b, + ); + matches!(diff, None | Some(itertools::Diff::Shorter(..))) + } + + pub(crate) fn parts_after_prefix(&self, _prefix: &Self) -> &[PathPart] { + unimplemented!() + } + + /// Add a part to the end of the path's directories, encoding any restricted + /// characters. + fn push_dir(&mut self, part: impl Into) { + let part = part.into(); + self.directories.push((&*part).into()); + } + + /// Add a `PathPart` to the end of the path's directories. + pub(crate) fn push_part_as_dir(&mut self, part: &PathPart) { + self.directories.push(part.to_owned()); + } +} + +impl From for DirsAndFileName { + fn from(path_rep: PathRepresentation) -> Self { + match path_rep { + PathRepresentation::RawCloud(path) => { + let mut parts: Vec<_> = path + .split_terminator(DELIMITER) + .map(|s| PathPart(s.to_string())) + .collect(); + let maybe_file_name = match parts.pop() { + Some(file) if file.0.contains('.') => Some(file), + Some(dir) => { + parts.push(dir); + None + } + None => None, + }; + Self { + directories: parts, + file_name: maybe_file_name, + } + } + PathRepresentation::RawPathBuf(path) => { + let mut parts: Vec<_> = path + .iter() + .flat_map(|s| s.to_os_string().into_string().map(PathPart)) + .collect(); + + let maybe_file_name = match parts.pop() { + Some(file) if file.0.contains('.') => Some(file), + Some(dir) => { + parts.push(dir); + None + } + None => None, + }; + Self { + directories: parts, + file_name: maybe_file_name, + } + } + PathRepresentation::Parts(dirs_and_file_name) => dirs_and_file_name, + } + } +} + +impl From<&'_ ObjectStorePath> for DirsAndFileName { + fn from(other: &'_ ObjectStorePath) -> Self { + other.clone().into() + } +} + +impl From for DirsAndFileName { + fn from(other: ObjectStorePath) -> Self { + other.inner.into() + } +} + // TODO: I made these structs rather than functions because I could see // `convert` being part of a trait, possibly, but that seemed a bit overly // complex for now. @@ -124,7 +314,27 @@ impl CloudConverter { /// Creates a cloud storage location by joining this `ObjectStorePath`'s /// parts with `DELIMITER` pub fn convert(object_store_path: &ObjectStorePath) -> String { - object_store_path.parts.iter().map(|p| &p.0).join(DELIMITER) + match &object_store_path.inner { + PathRepresentation::RawCloud(path) => path.to_owned(), + PathRepresentation::RawPathBuf(_path) => { + todo!("convert"); + } + PathRepresentation::Parts(dirs_and_file_name) => { + let mut path = dirs_and_file_name + .directories + .iter() + .map(|p| &p.0) + .join(DELIMITER); + + if !path.is_empty() { + path.push_str(DELIMITER); + } + if let Some(file_name) = &dirs_and_file_name.file_name { + path.push_str(&file_name.0); + } + path + } + } } } @@ -138,7 +348,23 @@ impl FileConverter { /// `PathBuf` building implementation appropriate for the current /// platform. pub fn convert(object_store_path: &ObjectStorePath) -> PathBuf { - object_store_path.parts.iter().map(|p| &p.0).collect() + match &object_store_path.inner { + PathRepresentation::RawCloud(_path) => { + todo!("convert"); + } + PathRepresentation::RawPathBuf(path) => path.to_owned(), + PathRepresentation::Parts(dirs_and_file_name) => { + let mut path: PathBuf = dirs_and_file_name + .directories + .iter() + .map(|p| &p.0) + .collect(); + if let Some(file_name) = &dirs_and_file_name.file_name { + path.set_file_name(&file_name.0); + } + path + } + } } } @@ -254,12 +480,12 @@ mod tests { // #[test] - fn cloud_prefix_no_trailing_delimiter_or_filename() { + fn cloud_prefix_no_trailing_delimiter_or_file_name() { // Use case: a file named `test_file.json` exists in object storage and it // should be returned for a search on prefix `test`, so the prefix path // should not get a trailing delimiter automatically added let mut prefix = ObjectStorePath::default(); - prefix.push("test"); + prefix.set_file_name("test"); let converted = CloudConverter::convert(&prefix); assert_eq!(converted, "test"); @@ -271,7 +497,7 @@ mod tests { // `foo_test.json`. A search for the prefix `foo/` should return // `foo/bar.json` but not `foo_test.json'. let mut prefix = ObjectStorePath::default(); - prefix.push_all(&["test", ""]); + prefix.push_dir("test"); let converted = CloudConverter::convert(&prefix); assert_eq!(converted, "test/"); @@ -280,82 +506,169 @@ mod tests { #[test] fn push_encodes() { let mut location = ObjectStorePath::default(); - location.push("foo/bar"); - location.push("baz%2Ftest"); + location.push_dir("foo/bar"); + location.push_dir("baz%2Ftest"); let converted = CloudConverter::convert(&location); - assert_eq!(converted, "foo%2Fbar/baz%252Ftest"); + assert_eq!(converted, "foo%2Fbar/baz%252Ftest/"); } #[test] fn push_all_encodes() { let mut location = ObjectStorePath::default(); - location.push_all(&["foo/bar", "baz%2Ftest"]); + location.push_all_dirs(&["foo/bar", "baz%2Ftest"]); let converted = CloudConverter::convert(&location); - assert_eq!(converted, "foo%2Fbar/baz%252Ftest"); + assert_eq!(converted, "foo%2Fbar/baz%252Ftest/"); } #[test] - fn starts_with_parts() { + fn prefix_matches_full_dirs() { let mut haystack = ObjectStorePath::default(); - haystack.push_all(&["foo/bar", "baz%2Ftest", "something"]); + haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]); + // self starts with self assert!( - haystack.starts_with(&haystack), + haystack.prefix_matches(&haystack), "{:?} should have started with {:?}", haystack, haystack ); + // a longer prefix doesn't match let mut needle = haystack.clone(); - needle.push("longer now"); + needle.push_dir("longer now"); assert!( - !haystack.starts_with(&needle), + !haystack.prefix_matches(&needle), "{:?} shouldn't have started with {:?}", haystack, needle ); + // one dir prefix matches let mut needle = ObjectStorePath::default(); - needle.push("foo/bar"); + needle.push_dir("foo/bar"); assert!( - haystack.starts_with(&needle), - "{:?} should have started with {:?}", - haystack, - needle - ); - needle.push("baz%2Ftest"); - assert!( - haystack.starts_with(&needle), + haystack.prefix_matches(&needle), "{:?} should have started with {:?}", haystack, needle ); - let mut needle = ObjectStorePath::default(); - needle.push("f"); + // two dir prefix matches + needle.push_dir("baz%2Ftest"); assert!( - haystack.starts_with(&needle), + haystack.prefix_matches(&needle), "{:?} should have started with {:?}", haystack, needle ); - needle.push("oo/bar"); - assert!( - !haystack.starts_with(&needle), - "{:?} shouldn't have started with {:?}", - haystack, - needle - ); + // partial dir prefix does NOT match, this may be surprising! let mut needle = ObjectStorePath::default(); - needle.push_all(&["foo/bar", "baz"]); + needle.push_dir("f"); assert!( - haystack.starts_with(&needle), - "{:?} should have started with {:?}", + !haystack.prefix_matches(&needle), + "{:?} should not have started with {:?}", + haystack, + needle + ); + + // one dir and one partial dir does NOT match, this may be surprising! + let mut needle = ObjectStorePath::default(); + needle.push_all_dirs(&["foo/bar", "baz"]); + assert!( + !haystack.prefix_matches(&needle), + "{:?} should not have started with {:?}", haystack, needle ); } + + #[test] + fn convert_raw_before_partial_eq() { + // dir and file_name + let cloud = ObjectStorePath::from_cloud_unchecked("test_dir/test_file.json"); + let mut built = ObjectStorePath::default(); + built.push_dir("test_dir"); + built.set_file_name("test_file.json"); + + assert_eq!(built, cloud); + + // dir, no file_name + let cloud = ObjectStorePath::from_cloud_unchecked("test_dir"); + let mut built = ObjectStorePath::default(); + built.push_dir("test_dir"); + + assert_eq!(built, cloud); + + // file_name, no dir + let cloud = ObjectStorePath::from_cloud_unchecked("test_file.json"); + let mut built = ObjectStorePath::default(); + built.set_file_name("test_file.json"); + + assert_eq!(built, cloud); + + // empty + let cloud = ObjectStorePath::from_cloud_unchecked(""); + let built = ObjectStorePath::default(); + + assert_eq!(built, cloud); + } + + #[test] + fn path_rep_conversions() { + // dir and file name + let cloud = PathRepresentation::RawCloud("foo/bar/blah.json".into()); + let cloud_parts: DirsAndFileName = cloud.into(); + + let path_buf = PathRepresentation::RawPathBuf("foo/bar/blah.json".into()); + let path_buf_parts: DirsAndFileName = path_buf.into(); + + let mut expected_parts = DirsAndFileName::default(); + expected_parts.push_dir("foo"); + expected_parts.push_dir("bar"); + expected_parts.file_name = Some("blah.json".into()); + + assert_eq!(cloud_parts, expected_parts); + assert_eq!(path_buf_parts, expected_parts); + + // dir, no file name + let cloud = PathRepresentation::RawCloud("foo/bar".into()); + let cloud_parts: DirsAndFileName = cloud.into(); + + let path_buf = PathRepresentation::RawPathBuf("foo/bar".into()); + let path_buf_parts: DirsAndFileName = path_buf.into(); + + expected_parts.file_name = None; + + assert_eq!(cloud_parts, expected_parts); + assert_eq!(path_buf_parts, expected_parts); + + // no dir, file name + let cloud = PathRepresentation::RawCloud("blah.json".into()); + let cloud_parts: DirsAndFileName = cloud.into(); + + let path_buf = PathRepresentation::RawPathBuf("blah.json".into()); + let path_buf_parts: DirsAndFileName = path_buf.into(); + + assert!(cloud_parts.directories.is_empty()); + assert_eq!(cloud_parts.file_name.unwrap().0, "blah.json"); + + assert!(path_buf_parts.directories.is_empty()); + assert_eq!(path_buf_parts.file_name.unwrap().0, "blah.json"); + + // empty + let cloud = PathRepresentation::RawCloud("".into()); + let cloud_parts: DirsAndFileName = cloud.into(); + + let path_buf = PathRepresentation::RawPathBuf("".into()); + let path_buf_parts: DirsAndFileName = path_buf.into(); + + assert!(cloud_parts.directories.is_empty()); + assert!(cloud_parts.file_name.is_none()); + + assert!(path_buf_parts.directories.is_empty()); + assert!(path_buf_parts.file_name.is_none()); + } } diff --git a/server/src/lib.rs b/server/src/lib.rs index ec9fdaa395..f6e13da1a4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -487,7 +487,8 @@ impl RemoteServer for RemoteServerImpl { // location in the store for the configuration file fn config_location(id: u32) -> ObjectStorePath { let mut path = ObjectStorePath::default(); - path.push_all(&[&id.to_string(), "config.json"]); + path.push_dir(id.to_string()); + path.set_file_name("config.json"); path } @@ -748,7 +749,8 @@ partition_key: server.store_configuration().await.unwrap(); let mut location = ObjectStorePath::default(); - location.push_all(&["1", "config.json"]); + location.push_dir("1"); + location.set_file_name("config.json"); let read_data = server .store diff --git a/server/src/snapshot.rs b/server/src/snapshot.rs index ae83823254..9863579a92 100644 --- a/server/src/snapshot.rs +++ b/server/src/snapshot.rs @@ -156,7 +156,7 @@ where let mut location = self.data_path.clone(); let file_name = format!("{}.parquet", table_name); - location.push(&file_name); + location.set_file_name(&file_name); self.write_batches(batches, &location).await?; self.mark_table_finished(pos); @@ -167,7 +167,7 @@ where let mut partition_meta_path = self.metadata_path.clone(); let key = format!("{}.json", &self.partition_meta.key); - partition_meta_path.push(&key); + partition_meta_path.set_file_name(&key); let json_data = serde_json::to_vec(&self.partition_meta).context(JsonGenerationError)?; let data = Bytes::from(json_data); let len = data.len(); @@ -363,10 +363,10 @@ mem,host=A,region=west used=45 1 let chunk = Arc::new(chunk); let (tx, rx) = tokio::sync::oneshot::channel(); let mut metadata_path = ObjectStorePath::default(); - metadata_path.push("meta"); + metadata_path.push_dir("meta"); let mut data_path = ObjectStorePath::default(); - data_path.push("data"); + data_path.push_dir("data"); let snapshot = snapshot_chunk( metadata_path.clone(), @@ -381,7 +381,7 @@ mem,host=A,region=west used=45 1 rx.await.unwrap(); let mut location = metadata_path; - location.push("testaroo.json"); + location.set_file_name("testaroo.json"); let summary = store .get(&location) @@ -416,10 +416,10 @@ mem,host=A,region=west used=45 1 let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); let chunk = Arc::new(ChunkWB::new(11)); let mut metadata_path = ObjectStorePath::default(); - metadata_path.push("meta"); + metadata_path.push_dir("meta"); let mut data_path = ObjectStorePath::default(); - data_path.push("data"); + data_path.push_dir("data"); let snapshot = Snapshot::new("testaroo", metadata_path, data_path, store, chunk, tables); diff --git a/src/server/http_routes.rs b/src/server/http_routes.rs index 59b3282dcb..db968faeeb 100644 --- a/src/server/http_routes.rs +++ b/src/server/http_routes.rs @@ -636,10 +636,10 @@ async fn snapshot_partition Date: Mon, 11 Jan 2021 12:45:19 -0500 Subject: [PATCH 2/7] fix: Implement parts_after_prefix; InMemory now passes --- object_store/src/memory.rs | 4 +- object_store/src/path.rs | 90 +++++++++++++++++++++++++++++++++----- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 2b35d67d1c..f1d59bb58d 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -131,7 +131,9 @@ impl InMemory { .range((&prefix)..) .take_while(|(k, _)| k.prefix_matches(&prefix)) { - let parts = k.parts_after_prefix(&prefix); + let parts = k + .parts_after_prefix(&prefix) + .expect("must have prefix if in range"); if parts.len() >= 2 { let mut full_prefix = prefix.clone(); diff --git a/object_store/src/path.rs b/object_store/src/path.rs index 86429fe658..aaf8483ecc 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -98,11 +98,6 @@ impl ObjectStorePath { } } } - - /// Returns delimiter-separated parts contained in `self` after `prefix`. - pub fn parts_after_prefix(&self, _prefix: &Self) -> &[PathPart] { - unimplemented!() - } } impl From<&'_ DirsAndFileName> for ObjectStorePath { @@ -146,9 +141,7 @@ impl PathRepresentation { fn push_all_dirs<'a>(self, parts: impl AsRef<[&'a str]>) -> Self { let mut dirs_and_file_name: DirsAndFileName = self.into(); - dirs_and_file_name - .directories - .extend(parts.as_ref().iter().map(|&v| v.into())); + dirs_and_file_name.push_all_dirs(parts); Self::Parts(dirs_and_file_name) } @@ -227,8 +220,35 @@ impl DirsAndFileName { matches!(diff, None | Some(itertools::Diff::Shorter(..))) } - pub(crate) fn parts_after_prefix(&self, _prefix: &Self) -> &[PathPart] { - unimplemented!() + /// Returns all directory and file name `PathParts` in `self` after the + /// specified `prefix`. Ignores any `file_name` part of `prefix`. + /// Returns `None` if `self` dosen't start with `prefix`. + pub(crate) fn parts_after_prefix(&self, prefix: &Self) -> Option> { + let mut dirs_iter = self.directories.iter(); + let mut prefix_dirs_iter = prefix.directories.iter(); + + let mut parts = vec![]; + + for dir in &mut dirs_iter { + let pre = prefix_dirs_iter.next(); + + match pre { + None => { + parts.push(dir.to_owned()); + break; + } + Some(p) if p == dir => continue, + Some(_) => return None, + } + } + + parts.extend(dirs_iter.cloned()); + + if let Some(file_name) = &self.file_name { + parts.push(file_name.to_owned()); + } + + Some(parts) } /// Add a part to the end of the path's directories, encoding any restricted @@ -238,6 +258,12 @@ impl DirsAndFileName { self.directories.push((&*part).into()); } + /// Push a bunch of parts as directories in one go. + fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) { + self.directories + .extend(parts.as_ref().iter().map(|&v| v.into())); + } + /// Add a `PathPart` to the end of the path's directories. pub(crate) fn push_part_as_dir(&mut self, part: &PathPart) { self.directories.push(part.to_owned()); @@ -671,4 +697,48 @@ mod tests { assert!(path_buf_parts.directories.is_empty()); assert!(path_buf_parts.file_name.is_none()); } + + #[test] + fn parts_after_prefix_behavior() { + let mut existing_path = DirsAndFileName::default(); + existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]); + existing_path.file_name = Some("egg.json".into()); + + // Prefix with one directory + let mut prefix = DirsAndFileName::default(); + prefix.push_dir("apple"); + let expected_parts: Vec = vec!["bear", "cow", "dog", "egg.json"] + .into_iter() + .map(Into::into) + .collect(); + let parts = existing_path.parts_after_prefix(&prefix).unwrap(); + assert_eq!(parts, expected_parts); + + // Prefix with two directories + let mut prefix = DirsAndFileName::default(); + prefix.push_all_dirs(&["apple", "bear"]); + let expected_parts: Vec = vec!["cow", "dog", "egg.json"] + .into_iter() + .map(Into::into) + .collect(); + let parts = existing_path.parts_after_prefix(&prefix).unwrap(); + assert_eq!(parts, expected_parts); + + // Not a prefix + let mut prefix = DirsAndFileName::default(); + prefix.push_dir("cow"); + assert!(existing_path.parts_after_prefix(&prefix).is_none()); + + // Prefix with a partial directory + let mut prefix = DirsAndFileName::default(); + prefix.push_dir("ap"); + assert!(existing_path.parts_after_prefix(&prefix).is_none()); + + // Prefix matches but there aren't any parts after it + let mut existing_path = DirsAndFileName::default(); + existing_path.push_all_dirs(&["apple", "bear", "cow", "dog"]); + let prefix = existing_path.clone(); + let parts = existing_path.parts_after_prefix(&prefix).unwrap(); + assert!(parts.is_empty()); + } } From f0ab0e25a00872cec95d5e628121b05aeabbe803 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 11 Jan 2021 12:57:07 -0500 Subject: [PATCH 3/7] fix: Match partial directory names with prefix --- object_store/src/path.rs | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/object_store/src/path.rs b/object_store/src/path.rs index aaf8483ecc..fa40bd017d 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -217,7 +217,27 @@ impl DirsAndFileName { prefix.directories.iter(), |a, b| a == b, ); - matches!(diff, None | Some(itertools::Diff::Shorter(..))) + + use itertools::Diff; + match diff { + None => true, + Some(Diff::Shorter(..)) => true, + Some(Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => { + let first_prefix = remaining_prefix + .next() + .expect("must have at least one mismatch to be in this case"); + + // There must not be any other remaining parts in the prefix + remaining_prefix.next().is_none() + // and the next item in self must start with the last item in the prefix + && remaining_self + .next() + .expect("must be at least one value") + .0 + .starts_with(&first_prefix.0) + } + _ => false, + } } /// Returns all directory and file name `PathParts` in `self` after the @@ -549,7 +569,7 @@ mod tests { } #[test] - fn prefix_matches_full_dirs() { + fn prefix_matches() { let mut haystack = ObjectStorePath::default(); haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]); @@ -590,22 +610,22 @@ mod tests { needle ); - // partial dir prefix does NOT match, this may be surprising! + // partial dir prefix matches let mut needle = ObjectStorePath::default(); needle.push_dir("f"); assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", + haystack.prefix_matches(&needle), + "{:?} should have started with {:?}", haystack, needle ); - // one dir and one partial dir does NOT match, this may be surprising! + // one dir and one partial dir matches let mut needle = ObjectStorePath::default(); needle.push_all_dirs(&["foo/bar", "baz"]); assert!( - !haystack.prefix_matches(&needle), - "{:?} should not have started with {:?}", + haystack.prefix_matches(&needle), + "{:?} should have started with {:?}", haystack, needle ); From 8570c88689f06c15894065ee7f4424f83df947c0 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 11 Jan 2021 13:59:10 -0500 Subject: [PATCH 4/7] test: Add a partial file name scenario to list_with_delimiter tests --- object_store/src/lib.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 9ceddae62c..9f915010be 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -475,6 +475,23 @@ mod tests { assert_eq!(object.size, data.len()); assert!(object.last_modified > time_before_creation); + // List with a prefix containing a partial "file name" + let mut prefix = ObjectStorePath::default(); + prefix.push_all_dirs(&["mydb", "wal", "000", "000"]); + prefix.set_file_name("001"); + + let mut expected_location = ObjectStorePath::default(); + expected_location.push_all_dirs(&["mydb", "wal", "000", "000"]); + expected_location.set_file_name("001.segment"); + + let result = storage.list_with_delimiter(&prefix).await.unwrap(); + assert!(result.common_prefixes.is_empty()); + assert_eq!(result.objects.len(), 1); + + let object = &result.objects[0]; + + assert_eq!(object.location, expected_location); + for f in &files { storage.delete(f).await.unwrap(); } From 383e3abfce0960e1d16150bada3b24ae906a7711 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 11 Jan 2021 14:03:08 -0500 Subject: [PATCH 5/7] fix: Remove dbgs --- object_store/src/aws.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index d34df549f5..a4b2f5ddfa 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -200,9 +200,8 @@ impl AmazonS3 { prefix: &'a ObjectStorePath, next_token: &Option, ) -> Result { - dbg!(&prefix); let converted_prefix = CloudConverter::convert(prefix); - dbg!(&converted_prefix); + let mut list_request = rusoto_s3::ListObjectsV2Request { bucket: self.bucket_name.clone(), prefix: Some(converted_prefix), @@ -225,7 +224,7 @@ impl AmazonS3 { }; let contents = resp.contents.unwrap_or_default(); - dbg!(&contents); + let objects: Vec<_> = contents .into_iter() .map(|object| { @@ -256,8 +255,6 @@ impl AmazonS3 { }) .collect(); - dbg!(&resp.common_prefixes); - let common_prefixes = resp .common_prefixes .unwrap_or_default() From ea719724e911e06461fddc24b5384052cc747782 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 13 Jan 2021 09:31:40 -0500 Subject: [PATCH 6/7] test: Add another filename to clarify behavior in this test --- object_store/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 9f915010be..dc01ef7096 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -432,6 +432,7 @@ mod tests { let files: Vec<_> = [ "mydb/wal/000/000/000.segment", "mydb/wal/000/000/001.segment", + "mydb/wal/000/000/002.segment", "mydb/wal/001/001/000.segment", "mydb/wal/foo.test", "mydb/data/whatevs", @@ -526,6 +527,7 @@ mod tests { "test_file", "mydb/wal/000/000/000.segment", "mydb/wal/000/000/001.segment", + "mydb/wal/000/000/002.segment", "mydb/wal/001/001/000.segment", "mydb/wal/foo.test", "mydb/data/whatevs", From 4acf0f6ea916c640ed285ebdfa7e9329990aca2d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 13 Jan 2021 09:57:11 -0500 Subject: [PATCH 7/7] fix: Take prefix's file name into account in This makes the in memory object store behave consistently with the cloud object stores. --- object_store/src/path.rs | 73 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/object_store/src/path.rs b/object_store/src/path.rs index fa40bd017d..dff1441836 100644 --- a/object_store/src/path.rs +++ b/object_store/src/path.rs @@ -77,8 +77,7 @@ impl ObjectStorePath { } /// Returns true if the directories in `prefix` are the same as the starting - /// directories of `self`. Does not use filenames; does not match - /// partial directory names. + /// directories of `self`. pub fn prefix_matches(&self, prefix: &Self) -> bool { use PathRepresentation::*; match (&self.inner, &prefix.inner) { @@ -220,8 +219,21 @@ impl DirsAndFileName { use itertools::Diff; match diff { - None => true, - Some(Diff::Shorter(..)) => true, + None => match (self.file_name.as_ref(), prefix.file_name.as_ref()) { + (Some(self_file), Some(prefix_file)) => self_file.0.starts_with(&prefix_file.0), + (Some(_self_file), None) => true, + (None, Some(_prefix_file)) => false, + (None, None) => true, + }, + Some(Diff::Shorter(_, mut remaining_self)) => { + let next_dir = remaining_self + .next() + .expect("must have at least one mismatch to be in this case"); + match prefix.file_name.as_ref() { + Some(prefix_file) => next_dir.0.starts_with(&prefix_file.0), + None => true, + } + } Some(Diff::FirstMismatch(_, mut remaining_self, mut remaining_prefix)) => { let first_prefix = remaining_prefix .next() @@ -631,6 +643,59 @@ mod tests { ); } + #[test] + fn prefix_matches_with_file_name() { + let mut haystack = ObjectStorePath::default(); + haystack.push_all_dirs(&["foo/bar", "baz%2Ftest", "something"]); + + let mut needle = haystack.clone(); + + // All directories match and file name is a prefix + haystack.set_file_name("foo.segment"); + needle.set_file_name("foo"); + + assert!( + haystack.prefix_matches(&needle), + "{:?} should have started with {:?}", + haystack, + needle + ); + + // All directories match but file name is not a prefix + needle.set_file_name("e"); + + assert!( + !haystack.prefix_matches(&needle), + "{:?} should not have started with {:?}", + haystack, + needle + ); + + // Not all directories match; file name is a prefix of the next directory; this + // matches + let mut needle = ObjectStorePath::default(); + needle.push_all_dirs(&["foo/bar", "baz%2Ftest"]); + needle.set_file_name("s"); + + assert!( + haystack.prefix_matches(&needle), + "{:?} should have started with {:?}", + haystack, + needle + ); + + // Not all directories match; file name is NOT a prefix of the next directory; + // no match + needle.set_file_name("p"); + + assert!( + !haystack.prefix_matches(&needle), + "{:?} should not have started with {:?}", + haystack, + needle + ); + } + #[test] fn convert_raw_before_partial_eq() { // dir and file_name